1 package org.apache.helix.mock.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27
28 import org.I0Itec.zkclient.DataUpdater;
29 import org.I0Itec.zkclient.exception.ZkNoNodeException;
30 import org.apache.helix.AccessOption;
31 import org.apache.helix.HelixManager;
32 import org.apache.helix.HelixManagerFactory;
33 import org.apache.helix.InstanceType;
34 import org.apache.helix.NotificationContext;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.ZkHelixTestManager;
37 import org.apache.helix.mock.participant.DummyProcess.DummyLeaderStandbyStateModelFactory;
38 import org.apache.helix.mock.participant.DummyProcess.DummyOnlineOfflineStateModelFactory;
39 import org.apache.helix.model.Message;
40 import org.apache.helix.participant.StateMachineEngine;
41 import org.apache.helix.participant.statemachine.StateModel;
42 import org.apache.helix.participant.statemachine.StateModelFactory;
43 import org.apache.helix.participant.statemachine.StateModelInfo;
44 import org.apache.helix.participant.statemachine.Transition;
45 import org.apache.helix.store.zk.ZkHelixPropertyStore;
46 import org.apache.log4j.Logger;
47
48
49 public class MockParticipant extends Thread
50 {
51 private static Logger LOG =
52 Logger.getLogger(MockParticipant.class);
53 private final String _clusterName;
54 private final String _instanceName;
55
56
57 private final CountDownLatch _startCountDown = new CountDownLatch(1);
58 private final CountDownLatch _stopCountDown = new CountDownLatch(1);
59 private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
60
61 private final ZkHelixTestManager _manager;
62 private final StateModelFactory _msModelFactory;
63 private final MockJobIntf _job;
64
65 public MockParticipant(String clusterName, String instanceName, String zkAddr) throws Exception
66 {
67 this(clusterName, instanceName, zkAddr, null, null);
68 }
69
70 public MockParticipant(String clusterName,
71 String instanceName,
72 String zkAddr,
73 MockTransition transition) throws Exception
74 {
75 this(clusterName, instanceName, zkAddr, transition, null);
76 }
77
78 public MockParticipant(String clusterName,
79 String instanceName,
80 String zkAddr,
81 MockTransition transition,
82 MockJobIntf job) throws Exception
83 {
84 _clusterName = clusterName;
85 _instanceName = instanceName;
86 _msModelFactory = new MockMSModelFactory(transition);
87
88 _manager = new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
89 _job = job;
90 }
91
92 public MockParticipant(StateModelFactory factory,
93 String clusterName,
94 String instanceName,
95 String zkAddr,
96 MockJobIntf job) throws Exception
97 {
98 _clusterName = clusterName;
99 _instanceName = instanceName;
100 _msModelFactory = factory;
101
102 _manager = new ZkHelixTestManager(_clusterName, _instanceName, InstanceType.PARTICIPANT, zkAddr);
103 _job = job;
104 }
105
106 public StateModelFactory getStateModelFactory()
107 {
108 return _msModelFactory;
109 }
110
111 public MockParticipant(ZkHelixTestManager manager, MockTransition transition)
112 {
113 _clusterName = manager.getClusterName();
114 _instanceName = manager.getInstanceName();
115 _manager = manager;
116
117 _msModelFactory = new MockMSModelFactory(transition);
118 _job = null;
119 }
120
121 public void setTransition(MockTransition transition)
122 {
123 if (_msModelFactory instanceof MockMSModelFactory)
124 {
125 ((MockMSModelFactory) _msModelFactory).setTrasition(transition);
126 }
127 }
128
129 public ZkHelixTestManager getManager()
130 {
131 return _manager;
132 }
133
134 public String getInstanceName()
135 {
136 return _instanceName;
137 }
138
139 public String getClusterName()
140 {
141 return _clusterName;
142 }
143
144 public void syncStop()
145 {
146 _stopCountDown.countDown();
147 try
148 {
149 _waitStopFinishCountDown.await();
150 }
151 catch (InterruptedException e)
152 {
153
154 e.printStackTrace();
155 }
156
157
158
159
160
161 }
162
163 public void syncStart()
164 {
165 super.start();
166 try
167 {
168 _startCountDown.await();
169 }
170 catch (InterruptedException e)
171 {
172
173 e.printStackTrace();
174 }
175 }
176
177 @Override
178 public void run()
179 {
180 try
181 {
182 StateMachineEngine stateMach = _manager.getStateMachineEngine();
183 stateMach.registerStateModelFactory("MasterSlave", _msModelFactory);
184
185 DummyLeaderStandbyStateModelFactory lsModelFactory =
186 new DummyLeaderStandbyStateModelFactory(10);
187 DummyOnlineOfflineStateModelFactory ofModelFactory =
188 new DummyOnlineOfflineStateModelFactory(10);
189 stateMach.registerStateModelFactory("LeaderStandby", lsModelFactory);
190 stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory);
191
192 MockSchemataModelFactory schemataFactory = new MockSchemataModelFactory();
193 stateMach.registerStateModelFactory("STORAGE_DEFAULT_SM_SCHEMATA", schemataFactory);
194
195
196
197 if (_job != null)
198 {
199 _job.doPreConnectJob(_manager);
200 }
201
202 _manager.connect();
203 _startCountDown.countDown();
204
205 if (_job != null)
206 {
207 _job.doPostConnectJob(_manager);
208 }
209
210 _stopCountDown.await();
211 }
212 catch (InterruptedException e)
213 {
214 String msg =
215 "participant: " + _instanceName + ", " + Thread.currentThread().getName()
216 + " is interrupted";
217 LOG.info(msg);
218 System.err.println(msg);
219 }
220 catch (Exception e)
221 {
222
223 e.printStackTrace();
224 }
225 finally
226 {
227 _startCountDown.countDown();
228
229 synchronized (_manager)
230 {
231 _manager.disconnect();
232 }
233 _waitStopFinishCountDown.countDown();
234 }
235 }
236 }