1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.I0Itec.zkclient.IZkStateListener;
28 import org.I0Itec.zkclient.ZkConnection;
29 import org.I0Itec.zkclient.ZkServer;
30 import org.apache.helix.PropertyKey.Builder;
31 import org.apache.helix.controller.pipeline.Pipeline;
32 import org.apache.helix.controller.pipeline.Stage;
33 import org.apache.helix.controller.pipeline.StageContext;
34 import org.apache.helix.controller.stages.ClusterEvent;
35 import org.apache.helix.manager.zk.ZKHelixAdmin;
36 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
37 import org.apache.helix.manager.zk.ZNRecordSerializer;
38 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39 import org.apache.helix.manager.zk.ZkClient;
40 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
41 import org.apache.helix.model.IdealState;
42 import org.apache.helix.model.InstanceConfig;
43 import org.apache.helix.model.LiveInstance;
44 import org.apache.helix.model.Message;
45 import org.apache.helix.model.StateModelDefinition;
46 import org.apache.helix.model.IdealState.IdealStateModeProperty;
47 import org.apache.helix.model.Message.Attributes;
48 import org.apache.helix.model.Message.MessageType;
49 import org.apache.helix.tools.StateModelConfigGenerator;
50 import org.apache.helix.util.HelixUtil;
51 import org.apache.log4j.Logger;
52 import org.apache.zookeeper.WatchedEvent;
53 import org.apache.zookeeper.Watcher;
54 import org.apache.zookeeper.Watcher.Event.KeeperState;
55 import org.apache.zookeeper.ZooKeeper;
56 import org.testng.Assert;
57 import org.testng.AssertJUnit;
58 import org.testng.annotations.AfterSuite;
59 import org.testng.annotations.BeforeSuite;
60
61
62
63 public class ZkUnitTestBase
64 {
65 private static Logger LOG = Logger.getLogger(ZkUnitTestBase.class);
66 protected static ZkServer _zkServer = null;
67 protected static ZkClient _gZkClient;
68
69 public static final String ZK_ADDR = "localhost:2185";
70 protected static final String CLUSTER_PREFIX = "CLUSTER";
71 protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
72
73 @BeforeSuite(alwaysRun = true)
74 public void beforeSuite() throws Exception
75 {
76 _zkServer = TestHelper.startZkSever(ZK_ADDR);
77 AssertJUnit.assertTrue(_zkServer != null);
78
79
80
81
82 _gZkClient = new ZkClient(ZK_ADDR);
83 _gZkClient.setZkSerializer(new ZNRecordSerializer());
84 }
85
86 @AfterSuite(alwaysRun = true)
87 public void afterTest()
88 {
89 _gZkClient.close();
90 TestHelper.stopZkServer(_zkServer);
91 _zkServer = null;
92
93
94
95
96 }
97
98 protected String getShortClassName()
99 {
100 String className = this.getClass().getName();
101 return className.substring(className.lastIndexOf('.') + 1);
102 }
103
104 protected String getCurrentLeader(ZkClient zkClient, String clusterName)
105 {
106 String leaderPath =
107 HelixUtil.getControllerPropertyPath(clusterName, PropertyType.LEADER);
108 ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
109 if (leaderRecord == null)
110 {
111 return null;
112 }
113
114 String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString());
115 return leader;
116 }
117
118 protected void stopCurrentLeader(ZkClient zkClient,
119 String clusterName,
120 Map<String, Thread> threadMap,
121 Map<String, HelixManager> managerMap)
122 {
123 String leader = getCurrentLeader(zkClient, clusterName);
124 Assert.assertTrue(leader != null);
125 System.out.println("stop leader:" + leader + " in " + clusterName);
126 Assert.assertTrue(leader != null);
127
128 HelixManager manager = managerMap.remove(leader);
129 Assert.assertTrue(manager != null);
130 manager.disconnect();
131
132 Thread thread = threadMap.remove(leader);
133 Assert.assertTrue(thread != null);
134 thread.interrupt();
135
136 boolean isNewLeaderElected = false;
137 try
138 {
139
140 for (int i = 0; i < 5; i++)
141 {
142 Thread.sleep(1000);
143 String newLeader = getCurrentLeader(zkClient, clusterName);
144 if (!newLeader.equals(leader))
145 {
146 isNewLeaderElected = true;
147 System.out.println("new leader elected: " + newLeader + " in " + clusterName);
148 break;
149 }
150 }
151 }
152 catch (InterruptedException e)
153 {
154 e.printStackTrace();
155 }
156 if (isNewLeaderElected == false)
157 {
158 System.out.println("fail to elect a new leader elected in " + clusterName);
159 }
160 AssertJUnit.assertTrue(isNewLeaderElected);
161 }
162
163 public void verifyInstance(ZkClient zkClient,
164 String clusterName,
165 String instance,
166 boolean wantExists)
167 {
168
169 String instanceConfigsPath =
170 PropertyPathConfig.getPath(PropertyType.CONFIGS,
171 clusterName,
172 ConfigScopeProperty.PARTICIPANT.toString());
173 String instanceConfigPath = instanceConfigsPath + "/" + instance;
174 String instancePath = HelixUtil.getInstancePath(clusterName, instance);
175 AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
176 AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
177 }
178
179 public void verifyResource(ZkClient zkClient,
180 String clusterName,
181 String resource,
182 boolean wantExists)
183 {
184 String resourcePath = HelixUtil.getIdealStatePath(clusterName) + "/" + resource;
185 AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
186 }
187
188 public void verifyEnabled(ZkClient zkClient,
189 String clusterName,
190 String instance,
191 boolean wantEnabled)
192 {
193 ZKHelixDataAccessor accessor =
194 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
195 Builder keyBuilder = accessor.keyBuilder();
196
197 InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
198 AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
199 }
200
201 public void verifyReplication(ZkClient zkClient,
202 String clusterName,
203 String resource,
204 int repl)
205 {
206 ZKHelixDataAccessor accessor =
207 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
208 Builder keyBuilder = accessor.keyBuilder();
209
210 IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
211 for (String partitionName : idealState.getPartitionSet())
212 {
213 if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
214 {
215 AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
216 }
217 else if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
218 {
219 AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName)
220 .size());
221 }
222 }
223 }
224
225 protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
226 InterruptedException
227 {
228 ZooKeeper oldZookeeper = zkConnection.getZookeeper();
229 LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
230
231 Watcher watcher = new Watcher()
232 {
233 @Override
234 public void process(WatchedEvent event)
235 {
236 LOG.info("In New connection, process event:" + event);
237 }
238 };
239
240 ZooKeeper newZookeeper =
241 new ZooKeeper(zkConnection.getServers(),
242 oldZookeeper.getSessionTimeout(),
243 watcher,
244 oldZookeeper.getSessionId(),
245 oldZookeeper.getSessionPasswd());
246 LOG.info("New sessionId = " + newZookeeper.getSessionId());
247
248 newZookeeper.close();
249 Thread.sleep(10000);
250 oldZookeeper = zkConnection.getZookeeper();
251 LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
252 }
253
254 protected void simulateSessionExpiry(ZkClient zkClient) throws IOException,
255 InterruptedException
256 {
257 IZkStateListener listener = new IZkStateListener()
258 {
259 @Override
260 public void handleStateChanged(KeeperState state) throws Exception
261 {
262 LOG.info("In Old connection, state changed:" + state);
263 }
264
265 @Override
266 public void handleNewSession() throws Exception
267 {
268 LOG.info("In Old connection, new session");
269 }
270 };
271 zkClient.subscribeStateChanges(listener);
272 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
273 ZooKeeper oldZookeeper = connection.getZookeeper();
274 LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
275
276 Watcher watcher = new Watcher()
277 {
278 @Override
279 public void process(WatchedEvent event)
280 {
281 LOG.info("In New connection, process event:" + event);
282 }
283 };
284
285 ZooKeeper newZookeeper =
286 new ZooKeeper(connection.getServers(),
287 oldZookeeper.getSessionTimeout(),
288 watcher,
289 oldZookeeper.getSessionId(),
290 oldZookeeper.getSessionPasswd());
291 LOG.info("New sessionId = " + newZookeeper.getSessionId());
292
293 newZookeeper.close();
294 Thread.sleep(10000);
295 connection = (ZkConnection) zkClient.getConnection();
296 oldZookeeper = connection.getZookeeper();
297 LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
298 }
299
300 protected void setupStateModel(String clusterName)
301 {
302 ZKHelixDataAccessor accessor =
303 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
304 Builder keyBuilder = accessor.keyBuilder();
305
306 StateModelConfigGenerator generator = new StateModelConfigGenerator();
307 StateModelDefinition masterSlave =
308 new StateModelDefinition(generator.generateConfigForMasterSlave());
309 accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
310
311 StateModelDefinition leaderStandby =
312 new StateModelDefinition(generator.generateConfigForLeaderStandby());
313 accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
314
315 StateModelDefinition onlineOffline =
316 new StateModelDefinition(generator.generateConfigForOnlineOffline());
317 accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
318
319 }
320
321 protected List<IdealState> setupIdealState(String clusterName,
322 int[] nodes,
323 String[] resources,
324 int partitions,
325 int replicas)
326 {
327 ZKHelixDataAccessor accessor =
328 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
329 Builder keyBuilder = accessor.keyBuilder();
330
331 List<IdealState> idealStates = new ArrayList<IdealState>();
332 List<String> instances = new ArrayList<String>();
333 for (int i : nodes)
334 {
335 instances.add("localhost_" + i);
336 }
337
338 for (String resourceName : resources)
339 {
340 IdealState idealState = new IdealState(resourceName);
341 for (int p = 0; p < partitions; p++)
342 {
343 List<String> value = new ArrayList<String>();
344 for (int r = 0; r < replicas; r++)
345 {
346 int n = nodes[(p + r) % nodes.length];
347 value.add("localhost_" + n);
348 }
349 idealState.getRecord().setListField(resourceName + "_" + p, value);
350 }
351
352 idealState.setReplicas(Integer.toString(replicas));
353 idealState.setStateModelDefRef("MasterSlave");
354 idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
355 idealState.setNumPartitions(partitions);
356 idealStates.add(idealState);
357
358
359 accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
360 }
361 return idealStates;
362 }
363
364 protected void setupLiveInstances(String clusterName, int[] liveInstances)
365 {
366 ZKHelixDataAccessor accessor =
367 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
368 Builder keyBuilder = accessor.keyBuilder();
369
370 for (int i = 0; i < liveInstances.length; i++)
371 {
372 String instance = "localhost_" + liveInstances[i];
373 LiveInstance liveInstance = new LiveInstance(instance);
374 liveInstance.setSessionId("session_" + liveInstances[i]);
375 liveInstance.setHelixVersion("0.0.0");
376 accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
377 }
378 }
379
380 protected void setupInstances(String clusterName, int[] instances)
381 {
382 HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
383 for (int i = 0; i < instances.length; i++)
384 {
385 String instance = "localhost_" + instances[i];
386 InstanceConfig instanceConfig = new InstanceConfig(instance);
387 instanceConfig.setHostName("localhost");
388 instanceConfig.setPort("" + instances[i]);
389 instanceConfig.setInstanceEnabled(true);
390 admin.addInstance(clusterName, instanceConfig);
391 }
392 }
393
394 protected void runPipeline(ClusterEvent event, Pipeline pipeline)
395 {
396 try
397 {
398 pipeline.handle(event);
399 pipeline.finish();
400 }
401 catch (Exception e)
402 {
403 LOG.error("Exception while executing pipeline:" + pipeline
404 + ". Will not continue to next pipeline", e);
405 }
406 }
407
408 protected void runStage(ClusterEvent event, Stage stage) throws Exception
409 {
410 StageContext context = new StageContext();
411 stage.init(context);
412 stage.preProcess();
413 stage.process(event);
414 stage.postProcess();
415 }
416
417 protected Message createMessage(MessageType type,
418 String msgId,
419 String fromState,
420 String toState,
421 String resourceName,
422 String tgtName)
423 {
424 Message msg = new Message(type.toString(), msgId);
425 msg.setFromState(fromState);
426 msg.setToState(toState);
427 msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
428 msg.setTgtName(tgtName);
429 return msg;
430 }
431
432 }