1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Map.Entry;
28 import java.util.Random;
29 import java.util.concurrent.ConcurrentHashMap;
30
31 import org.apache.helix.HelixManager;
32 import org.apache.helix.PropertyPathConfig;
33 import org.apache.helix.PropertyType;
34 import org.apache.helix.TestHelper;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.TestHelper.StartCMResult;
37 import org.apache.helix.controller.HelixControllerMain;
38 import org.apache.helix.manager.zk.ZNRecordSerializer;
39 import org.apache.helix.manager.zk.ZkClient;
40 import org.apache.helix.model.IdealState.IdealStateModeProperty;
41 import org.apache.helix.model.IdealState.IdealStateProperty;
42 import org.apache.helix.store.PropertyJsonSerializer;
43 import org.apache.helix.store.PropertyStoreException;
44 import org.apache.helix.tools.ClusterSetup;
45 import org.apache.helix.tools.ClusterStateVerifier;
46 import org.apache.helix.tools.DefaultIdealStateCalculator;
47 import org.apache.helix.tools.TestCommand;
48 import org.apache.helix.tools.TestExecutor;
49 import org.apache.helix.tools.TestTrigger;
50 import org.apache.helix.tools.ZnodeOpArg;
51 import org.apache.helix.tools.TestCommand.CommandType;
52 import org.apache.helix.tools.TestCommand.NodeOpArg;
53 import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
54 import org.apache.log4j.Logger;
55 import org.testng.Assert;
56
57
58 public class TestDriver
59 {
60 private static Logger LOG = Logger.getLogger(TestDriver.class);
61 private static final String ZK_ADDR = ZkIntegrationTestBase.ZK_ADDR;
62
63
64 private static final String STATE_MODEL = "MasterSlave";
65 private static final String TEST_DB_PREFIX = "TestDB";
66 private static final int START_PORT = 12918;
67 private static final String CONTROLLER_PREFIX = "controller";
68 private static final String PARTICIPANT_PREFIX = "localhost";
69 private static final Random RANDOM = new Random();
70 private static final PropertyJsonSerializer<ZNRecord> SERIALIZER = new PropertyJsonSerializer<ZNRecord>(
71 ZNRecord.class);
72
73 private static final Map<String, TestInfo> _testInfoMap = new ConcurrentHashMap<String, TestInfo>();
74
75 public static class TestInfo
76 {
77 public final ZkClient _zkClient;
78 public final String _clusterName;
79 public final int _numDb;
80 public final int _numPartitionsPerDb;
81 public final int _numNode;
82 public final int _replica;
83
84
85
86 public final Map<String, StartCMResult> _startCMResultMap = new ConcurrentHashMap<String, StartCMResult>();
87
88 public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
89 int numNode, int replica)
90 {
91 this._clusterName = clusterName;
92 this._zkClient = zkClient;
93 this._numDb = numDb;
94 this._numPartitionsPerDb = numPartitionsPerDb;
95 this._numNode = numNode;
96 this._replica = replica;
97 }
98 }
99
100 public static TestInfo getTestInfo(String uniqClusterName)
101 {
102 if (!_testInfoMap.containsKey(uniqClusterName))
103 {
104 String errMsg = "Cluster hasn't been setup for " + uniqClusterName;
105 throw new IllegalArgumentException(errMsg);
106 }
107
108 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
109 return testInfo;
110 }
111
112 public static void setupClusterWithoutRebalance(String uniqClusterName, String zkAddr,
113 int numResources, int numPartitionsPerResource, int numInstances, int replica)
114 throws Exception
115 {
116 setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
117 replica, false);
118 }
119
120 public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
121 int numPartitionsPerResource, int numInstances, int replica) throws Exception
122 {
123 setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
124 replica, true);
125 }
126
127
128
129
130
131 public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
132 int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
133 throws Exception
134 {
135 ZkClient zkClient = new ZkClient(zkAddr);
136 zkClient.setZkSerializer(new ZNRecordSerializer());
137
138
139 String clusterName = uniqClusterName;
140 if (zkClient.exists("/" + clusterName))
141 {
142 LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName
143 + " is not unique or test has been run without cleaning up zk; deleting it");
144 zkClient.deleteRecursive("/" + clusterName);
145 }
146
147 if (_testInfoMap.containsKey(uniqClusterName))
148 {
149 LOG.warn("test info already exists:" + uniqClusterName
150 + " is not unique or test has been run without cleaning up test info map; removing it");
151 _testInfoMap.remove(uniqClusterName);
152 }
153 TestInfo testInfo = new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource,
154 numInstances, replica);
155 _testInfoMap.put(uniqClusterName, testInfo);
156
157 ClusterSetup setupTool = new ClusterSetup(zkAddr);
158 setupTool.addCluster(clusterName, true);
159
160 for (int i = 0; i < numInstances; i++)
161 {
162 int port = START_PORT + i;
163 setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + "_" + port);
164 }
165
166 for (int i = 0; i < numResources; i++)
167 {
168 String dbName = TEST_DB_PREFIX + i;
169 setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource,
170 STATE_MODEL);
171 if (doRebalance)
172 {
173 setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
174
175
176
177
178
179
180 }
181 }
182 }
183
184
185
186
187
188
189
190 public static void startDummyParticipant(String uniqClusterName, int instanceId) throws Exception
191 {
192 startDummyParticipants(uniqClusterName, new int[] { instanceId });
193 }
194
195 public static void startDummyParticipants(String uniqClusterName, int[] instanceIds)
196 throws Exception
197 {
198 if (!_testInfoMap.containsKey(uniqClusterName))
199 {
200 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
201 throw new IllegalArgumentException(errMsg);
202 }
203
204 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
205 String clusterName = testInfo._clusterName;
206
207 for (int id : instanceIds)
208 {
209 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
210
211 if (testInfo._startCMResultMap.containsKey(instanceName))
212 {
213 LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
214 } else
215 {
216 StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
217 testInfo._startCMResultMap.put(instanceName, result);
218
219 }
220 }
221 }
222
223 public static void startController(String uniqClusterName) throws Exception
224 {
225 startController(uniqClusterName, new int[] { 0 });
226 }
227
228 public static void startController(String uniqClusterName, int[] nodeIds) throws Exception
229 {
230 if (!_testInfoMap.containsKey(uniqClusterName))
231 {
232 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
233 throw new IllegalArgumentException(errMsg);
234 }
235
236 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
237 String clusterName = testInfo._clusterName;
238
239 for (int id : nodeIds)
240 {
241 String controllerName = CONTROLLER_PREFIX + "_" + id;
242 if (testInfo._startCMResultMap.containsKey(controllerName))
243 {
244 LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
245 } else
246 {
247 StartCMResult result = TestHelper.startController(clusterName, controllerName, ZK_ADDR,
248 HelixControllerMain.STANDALONE);
249 testInfo._startCMResultMap.put(controllerName, result);
250 }
251 }
252 }
253
254 public static void verifyCluster(String uniqClusterName, long beginTime, long timeout)
255 throws Exception
256 {
257 Thread.sleep(beginTime);
258
259 if (!_testInfoMap.containsKey(uniqClusterName))
260 {
261 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
262 throw new IllegalArgumentException(errMsg);
263 }
264
265 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
266 String clusterName = testInfo._clusterName;
267
268 boolean result = ClusterStateVerifier.verifyByPolling(
269 new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName), timeout);
270 Assert.assertTrue(result);
271 }
272
273 public static void stopCluster(String uniqClusterName) throws Exception
274 {
275 if (!_testInfoMap.containsKey(uniqClusterName))
276 {
277 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
278 throw new IllegalArgumentException(errMsg);
279 }
280 TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
281
282
283 for (Iterator<Entry<String, StartCMResult>> it = testInfo._startCMResultMap.entrySet()
284 .iterator(); it.hasNext();)
285 {
286 Map.Entry<String, StartCMResult> entry = it.next();
287 String instanceName = entry.getKey();
288 if (instanceName.startsWith(CONTROLLER_PREFIX))
289 {
290 it.remove();
291 HelixManager manager = entry.getValue()._manager;
292 manager.disconnect();
293 Thread thread = entry.getValue()._thread;
294 thread.interrupt();
295 }
296 }
297
298 Thread.sleep(1000);
299
300
301 for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet())
302 {
303 HelixManager manager = entry.getValue()._manager;
304 manager.disconnect();
305 Thread thread = entry.getValue()._thread;
306 thread.interrupt();
307 }
308
309 testInfo._zkClient.close();
310 }
311
312 public static void stopDummyParticipant(String uniqClusterName, long beginTime, int instanceId)
313 throws Exception
314 {
315 if (!_testInfoMap.containsKey(uniqClusterName))
316 {
317
318 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
319 throw new Exception(errMsg);
320 }
321
322 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
323
324
325 String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
326 StartCMResult result = testInfo._startCMResultMap.remove(failHost);
327
328
329 if (result == null || result._manager == null || result._thread == null)
330 {
331 String errMsg = "Dummy participant:" + failHost + " seems not running";
332 LOG.error(errMsg);
333 } else
334 {
335
336
337 NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
338 TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
339 List<TestCommand> commandList = new ArrayList<TestCommand>();
340 commandList.add(command);
341 TestExecutor.executeTestAsync(commandList, ZK_ADDR);
342 }
343 }
344
345 public static void setIdealState(String uniqClusterName, long beginTime, int percentage)
346 throws Exception
347 {
348 if (!_testInfoMap.containsKey(uniqClusterName))
349 {
350 String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
351 throw new IllegalArgumentException(errMsg);
352 }
353 TestInfo testInfo = _testInfoMap.get(uniqClusterName);
354 String clusterName = testInfo._clusterName;
355 List<String> instanceNames = new ArrayList<String>();
356
357 for (int i = 0; i < testInfo._numNode; i++)
358 {
359 int port = START_PORT + i;
360 instanceNames.add(PARTICIPANT_PREFIX + "_" + port);
361 }
362
363 List<TestCommand> commandList = new ArrayList<TestCommand>();
364 for (int i = 0; i < testInfo._numDb; i++)
365 {
366 String dbName = TEST_DB_PREFIX + i;
367 ZNRecord destIS = DefaultIdealStateCalculator.calculateIdealState(instanceNames,
368 testInfo._numPartitionsPerDb, testInfo._replica - 1, dbName, "MASTER", "SLAVE");
369
370 destIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
371 IdealStateModeProperty.CUSTOMIZED.toString());
372 destIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
373 Integer.toString(testInfo._numPartitionsPerDb));
374 destIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
375 destIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
376
377
378
379 ZNRecord initIS = new ZNRecord(dbName);
380
381 initIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
382 IdealStateModeProperty.CUSTOMIZED.toString());
383 initIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
384 Integer.toString(testInfo._numPartitionsPerDb));
385 initIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
386 initIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
387 int totalStep = calcuateNumTransitions(initIS, destIS);
388
389
390
391
392
393
394 ZNRecord nextIS;
395 int step = totalStep * percentage / 100;
396 System.out.println("Resource:" + dbName + ", totalSteps from initIS to destIS:" + totalStep
397 + ", walk " + step + " steps(" + percentage + "%)");
398 nextIS = nextIdealState(initIS, destIS, step);
399
400 String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName,
401 TEST_DB_PREFIX + i);
402 ZnodeOpArg arg = new ZnodeOpArg(idealStatePath, ZnodePropertyType.ZNODE, "+", nextIS);
403 TestCommand command = new TestCommand(CommandType.MODIFY, new TestTrigger(beginTime), arg);
404 commandList.add(command);
405 }
406
407 TestExecutor.executeTestAsync(commandList, ZK_ADDR);
408
409 }
410
411 private static List<String[]> findAllUnfinishPairs(ZNRecord cur, ZNRecord dest)
412 {
413
414 List<String[]> list = new ArrayList<String[]>();
415 Map<String, Map<String, String>> map = dest.getMapFields();
416 for (Map.Entry<String, Map<String, String>> entry : map.entrySet())
417 {
418 String partitionName = entry.getKey();
419 Map<String, String> hostMap = entry.getValue();
420 for (Map.Entry<String, String> hostEntry : hostMap.entrySet())
421 {
422 String host = hostEntry.getKey();
423 String destState = hostEntry.getValue();
424 Map<String, String> curHostMap = cur.getMapField(partitionName);
425
426 String curState = null;
427 if (curHostMap != null)
428 {
429 curState = curHostMap.get(host);
430 }
431
432 String[] pair = new String[3];
433 if (curState == null)
434 {
435 if (destState.equalsIgnoreCase("SLAVE"))
436 {
437 pair[0] = new String(partitionName);
438 pair[1] = new String(host);
439 pair[2] = new String("1");
440 list.add(pair);
441 } else if (destState.equalsIgnoreCase("MASTER"))
442 {
443 pair[0] = new String(partitionName);
444 pair[1] = new String(host);
445 pair[2] = new String("2");
446 list.add(pair);
447 }
448 } else
449 {
450 if (curState.equalsIgnoreCase("SLAVE") && destState.equalsIgnoreCase("MASTER"))
451 {
452 pair[0] = new String(partitionName);
453 pair[1] = new String(host);
454 pair[2] = new String("1");
455 list.add(pair);
456 }
457 }
458 }
459 }
460 return list;
461 }
462
463 private static int calcuateNumTransitions(ZNRecord start, ZNRecord end)
464 {
465 int totalSteps = 0;
466 List<String[]> list = findAllUnfinishPairs(start, end);
467 for (String[] pair : list)
468 {
469 totalSteps += Integer.parseInt(pair[2]);
470 }
471 return totalSteps;
472 }
473
474 private static ZNRecord nextIdealState(final ZNRecord cur, final ZNRecord dest, final int steps)
475 throws PropertyStoreException
476 {
477
478 ZNRecord next = SERIALIZER.deserialize(SERIALIZER.serialize(cur));
479 List<String[]> list = findAllUnfinishPairs(cur, dest);
480
481
482
483 for (int i = 0; i < steps; i++)
484 {
485 int randomInt = RANDOM.nextInt(list.size());
486 String[] pair = list.get(randomInt);
487 String curState = null;
488 Map<String, String> curHostMap = next.getMapField(pair[0]);
489 if (curHostMap != null)
490 {
491 curState = curHostMap.get(pair[1]);
492 }
493 final String destState = dest.getMapField(pair[0]).get(pair[1]);
494
495
496 if (curState == null && destState != null)
497 {
498 Map<String, String> hostMap = next.getMapField(pair[0]);
499 if (hostMap == null)
500 {
501 hostMap = new HashMap<String, String>();
502 }
503 hostMap.put(pair[1], "SLAVE");
504 next.setMapField(pair[0], hostMap);
505 } else if (curState.equalsIgnoreCase("SLAVE") && destState != null
506 && destState.equalsIgnoreCase("MASTER"))
507 {
508 next.getMapField(pair[0]).put(pair[1], "MASTER");
509 } else
510 {
511 LOG.error("fail to calculate the next ideal state");
512 }
513 curState = next.getMapField(pair[0]).get(pair[1]);
514 if (curState != null && curState.equalsIgnoreCase(destState))
515 {
516 list.remove(randomInt);
517 }
518 }
519
520 LOG.info("nextIS:" + next);
521 return next;
522 }
523 }