View Javadoc

1   package org.apache.helix.mock.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.concurrent.CountDownLatch;
27  
28  import org.I0Itec.zkclient.DataUpdater;
29  import org.I0Itec.zkclient.exception.ZkNoNodeException;
30  import org.apache.helix.AccessOption;
31  import org.apache.helix.HelixManager;
32  import org.apache.helix.HelixManagerFactory;
33  import org.apache.helix.InstanceType;
34  import org.apache.helix.NotificationContext;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.ZkHelixTestManager;
37  import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
38  import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
39  import org.apache.helix.model.Message;
40  import org.apache.helix.participant.StateMachineEngine;
41  import org.apache.helix.participant.statemachine.StateModel;
42  import org.apache.helix.participant.statemachine.StateModelFactory;
43  import org.apache.helix.participant.statemachine.StateModelInfo;
44  import org.apache.helix.participant.statemachine.Transition;
45  import org.apache.helix.store.zk.ZkHelixPropertyStore;
46  import org.apache.log4j.Logger;
47  
48  
49  public class MockParticipant extends Thread
50  {
51    private static Logger           LOG                      =
52                                                                 Logger.getLogger(MockParticipant.class);
53    private final String            _clusterName;
54    private final String            _instanceName;
55    // private final String _zkAddr;
56  
57    private final CountDownLatch    _startCountDown          = new CountDownLatch(1);
58    private final CountDownLatch    _stopCountDown           = new CountDownLatch(1);
59    private final CountDownLatch    _waitStopFinishCountDown = new CountDownLatch(1);
60  
61    private final ZkHelixTestManager _manager;
62    private final StateModelFactory _msModelFactory;
63    private final MockJobIntf       _job;
64  
65    public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception
66    {
67      this(clusterName, instanceName, zkAddr, null, null);
68    }
69  
70    public MockParticipant(String clusterName,
71                           String instanceName,
72                           String zkAddr,
73                           MockTransition transition) throws Exception
74    {
75      this(clusterName, instanceName, zkAddr, transition, null);
76    }
77  
78    public MockParticipant(String clusterName,
79                           String instanceName,
80                           String zkAddr,
81                           MockTransition transition,
82                           MockJobIntf job) throws Exception
83    {
84      _clusterName = clusterName;
85      _instanceName = instanceName;
86      _msModelFactory = new MockMSModelFactory(transition);
87  
88      _manager = new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
89      _job = job;
90    }
91  
92    public MockParticipant(StateModelFactory factory,
93                           String clusterName,
94                           String instanceName,
95                           String zkAddr,
96                           MockJobIntf job) throws Exception
97    {
98      _clusterName = clusterName;
99      _instanceName = instanceName;
100     _msModelFactory = factory;
101 
102     _manager = new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
103     _job = job;
104   }
105 
106   public StateModelFactory getStateModelFactory()
107   {
108     return _msModelFactory;
109   }
110 
111   public MockParticipant(ZkHelixTestManager manager, MockTransition transition)
112   {
113     _clusterName = manager.getClusterName();
114     _instanceName = manager.getInstanceName();
115     _manager = manager;
116 
117     _msModelFactory = new MockMSModelFactory(transition);
118     _job = null;
119   }
120 
121   public void setTransition(MockTransition transition)
122   {
123     if (_msModelFactory instanceof MockMSModelFactory)
124     {
125       ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
126     }
127   }
128 
129   public ZkHelixTestManager getManager()
130   {
131     return _manager;
132   }
133 
134   public String getInstanceName()
135   {
136     return _instanceName;
137   }
138 
139   public String getClusterName()
140   {
141     return _clusterName;
142   }
143 
144   public void syncStop()
145   {
146     _stopCountDown.countDown();
147     try
148     {
149       _waitStopFinishCountDown.await();
150     }
151     catch (InterruptedException e)
152     {
153       // TODO Auto-generated catch block
154       e.printStackTrace();
155     }
156 
157     // synchronized (_manager)
158     // {
159     // _manager.disconnect();
160     // }
161   }
162 
163   public void syncStart()
164   {
165     super.start();
166     try
167     {
168       _startCountDown.await();
169     }
170     catch (InterruptedException e)
171     {
172       // TODO Auto-generated catch block
173       e.printStackTrace();
174     }
175   }
176 
177   @Override
178   public void run()
179   {
180     try
181     {
182       StateMachineEngine stateMach = _manager.getStateMachineEngine();
183       stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
184 
185       DummyLeaderStandbyStateModelFactory lsModelFactory =
186           new DummyLeaderStandbyStateModelFactory(10);
187       DummyOnlineOfflineStateModelFactory ofModelFactory =
188           new DummyOnlineOfflineStateModelFactory(10);
189       stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
190       stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
191 
192       MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
193       stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
194       // MockBootstrapModelFactory bootstrapFactory = new MockBootstrapModelFactory();
195       // stateMach.registerStateModelFactory("Bootstrap", bootstrapFactory);
196 
197       if (_job != null)
198       {
199         _job.doPreConnectJob(_manager);
200       }
201 
202       _manager.connect();
203       _startCountDown.countDown();
204 
205       if (_job != null)
206       {
207         _job.doPostConnectJob(_manager);
208       }
209 
210       _stopCountDown.await();
211     }
212     catch (InterruptedException e)
213     {
214       String msg =
215           "participant: " + _instanceName + ", " + Thread.currentThread().getName()
216               + " is interrupted";
217       LOG.info(msg);
218       System.err.println(msg);
219     }
220     catch (Exception e)
221     {
222       // TODO Auto-generated catch block
223       e.printStackTrace();
224     }
225     finally
226     {
227       _startCountDown.countDown();
228 
229       synchronized (_manager)
230       {
231         _manager.disconnect();
232       }
233       _waitStopFinishCountDown.countDown();
234     }
235   }
236 }