1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.IOException;
25 import java.io.InputStreamReader;
26 import java.io.PrintWriter;
27 import java.lang.reflect.Method;
28 import java.net.ServerSocket;
29 import java.net.Socket;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.TreeMap;
39 import java.util.TreeSet;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.ConcurrentHashMap;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44
45 import org.I0Itec.zkclient.IDefaultNameSpace;
46 import org.I0Itec.zkclient.ZkServer;
47 import org.I0Itec.zkclient.exception.ZkNoNodeException;
48 import org.apache.commons.io.FileUtils;
49 import org.apache.helix.PropertyKey.Builder;
50 import org.apache.helix.controller.HelixControllerMain;
51 import org.apache.helix.manager.zk.ZKHelixAdmin;
52 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
53 import org.apache.helix.manager.zk.ZNRecordSerializer;
54 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
55 import org.apache.helix.manager.zk.ZkClient;
56 import org.apache.helix.model.CurrentState;
57 import org.apache.helix.model.ExternalView;
58 import org.apache.helix.model.IdealState.IdealStateModeProperty;
59 import org.apache.helix.model.Message;
60 import org.apache.helix.model.Message.MessageType;
61 import org.apache.helix.model.StateModelDefinition;
62 import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
63 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
64 import org.apache.helix.participant.StateMachineEngine;
65 import org.apache.helix.store.zk.ZNode;
66 import org.apache.helix.tools.ClusterSetup;
67 import org.apache.helix.util.ZKClientPool;
68 import org.apache.log4j.Logger;
69 import org.apache.zookeeper.data.Stat;
70 import org.testng.Assert;
71
72
73 public class TestHelper
74 {
75 private static final Logger LOG = Logger.getLogger(TestHelper.class);
76
77
78
79
80 public static int getRandomPort() throws IOException {
81 ServerSocket sock = new ServerSocket();
82 sock.bind(null);
83 int port = sock.getLocalPort();
84 sock.close();
85
86 return port;
87 }
88
89 static public ZkServer startZkSever(final String zkAddress) throws Exception
90 {
91 List<String> empty = Collections.emptyList();
92 return TestHelper.startZkSever(zkAddress, empty, true);
93 }
94
95 static public ZkServer startZkSever(final String zkAddress, final String rootNamespace) throws Exception
96 {
97 List<String> rootNamespaces = new ArrayList<String>();
98 rootNamespaces.add(rootNamespace);
99 return TestHelper.startZkSever(zkAddress, rootNamespaces, true);
100 }
101
102 static public ZkServer startZkSever(final String zkAddress,
103 final List<String> rootNamespaces) throws Exception {
104 return startZkSever(zkAddress, rootNamespaces, true);
105 }
106
107 static public ZkServer startZkSever(final String zkAddress,
108 final List<String> rootNamespaces, boolean overwrite) throws Exception
109 {
110 System.out.println("Start zookeeper at " + zkAddress + " in thread "
111 + Thread.currentThread().getName());
112
113 String zkDir = zkAddress.replace(':', '_');
114 final String logDir = "/tmp/" + zkDir + "/logs";
115 final String dataDir = "/tmp/" + zkDir + "/dataDir";
116 if (overwrite) {
117 FileUtils.deleteDirectory(new File(dataDir));
118 FileUtils.deleteDirectory(new File(logDir));
119 }
120 ZKClientPool.reset();
121
122 IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
123 {
124 @Override
125 public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
126 {
127 if (rootNamespaces == null) {
128 return;
129 }
130
131 for (String rootNamespace : rootNamespaces)
132 {
133 try
134 {
135 zkClient.deleteRecursive(rootNamespace);
136 }
137 catch (Exception e)
138 {
139 LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:"
140 + e);
141 }
142 }
143 }
144 };
145
146 int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
147 ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
148 zkServer.start();
149
150 return zkServer;
151 }
152
153 static public void stopZkServer(ZkServer zkServer)
154 {
155 if (zkServer != null)
156 {
157 zkServer.shutdown();
158 System.out.println("Shut down zookeeper at port " + zkServer.getPort()
159 + " in thread " + Thread.currentThread().getName());
160 }
161 }
162
163 public static StartCMResult startDummyProcess(final String zkAddr,
164 final String clusterName,
165 final String instanceName) throws Exception
166 {
167 StartCMResult result = new StartCMResult();
168 ZkHelixTestManager manager = null;
169 manager = new ZkHelixTestManager(clusterName,
170 instanceName,
171 InstanceType.PARTICIPANT,
172 zkAddr);
173 result._manager = manager;
174 Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
175 result._thread = thread;
176 thread.start();
177
178 return result;
179 }
180
181 private static ZkHelixTestManager startHelixController(final String zkConnectString,
182 final String clusterName, final String controllerName, final String controllerMode)
183 {
184 ZkHelixTestManager manager = null;
185 try
186 {
187 if (controllerMode.equalsIgnoreCase(HelixControllerMain.STANDALONE))
188 {
189 manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkConnectString);
190 manager.connect();
191 } else if (controllerMode.equalsIgnoreCase(HelixControllerMain.DISTRIBUTED))
192 {
193 manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
194
195 DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
196 zkConnectString);
197
198 StateMachineEngine stateMach = manager.getStateMachineEngine();
199 stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
200 manager.connect();
201 } else
202 {
203 LOG.error("cluster controller mode:" + controllerMode + " NOT supported");
204 }
205 } catch (Exception e)
206 {
207
208 e.printStackTrace();
209 }
210
211 return manager;
212 }
213
214
215 public static StartCMResult startController(final String clusterName,
216 final String controllerName,
217 final String zkConnectString,
218 final String controllerMode) throws Exception
219 {
220 final StartCMResult result = new StartCMResult();
221 final ZkHelixTestManager manager = startHelixController(zkConnectString,
222 clusterName,
223 controllerName,
224 controllerMode);
225 result._manager = manager;
226
227 Thread thread = new Thread(new Runnable()
228 {
229 @Override
230 public void run()
231 {
232
233
234 try
235 {
236
237 Thread.currentThread().join();
238 }
239 catch (InterruptedException e)
240 {
241 String msg =
242 "controller:" + controllerName + ", " + Thread.currentThread().getName()
243 + " interrupted";
244 LOG.info(msg);
245
246 }
247 catch (Exception e)
248 {
249 e.printStackTrace();
250 }
251 }
252 });
253
254 thread.start();
255 result._thread = thread;
256 return result;
257 }
258
259 public static class StartCMResult
260 {
261 public Thread _thread;
262 public ZkHelixTestManager _manager;
263
264 }
265
266 public static void setupEmptyCluster(ZkClient zkClient, String clusterName)
267 {
268 ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
269 admin.addCluster(clusterName, true);
270 }
271
272
273
274
275
276
277
278 public static <T> Set<T> setOf(T... s)
279 {
280 Set<T> set = new HashSet<T>(Arrays.asList(s));
281 return set;
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295 public static void verifyWithTimeout(String verifierName, long timeout, Object... args)
296 {
297 final long sleepInterval = 1000;
298 final int loop = (int) (timeout / sleepInterval) + 1;
299 try
300 {
301 boolean result = false;
302 int i = 0;
303 for (; i < loop; i++)
304 {
305 Thread.sleep(sleepInterval);
306
307 result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args);
308
309 if (result == true)
310 {
311 break;
312 }
313 }
314
315
316
317
318 System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify "
319 + " (" + result + ")");
320 LOG.debug("args:" + Arrays.toString(args));
321
322
323 if (result == false)
324 {
325 LOG.error(verifierName + " fails");
326 LOG.error("args:" + Arrays.toString(args));
327 }
328
329 Assert.assertTrue(result);
330 }
331 catch (Exception e)
332 {
333
334 e.printStackTrace();
335 }
336 }
337
338 private static Method getMethod(String name)
339 {
340 Method[] methods = TestHelper.class.getMethods();
341 for (Method method : methods)
342 {
343 if (name.equals(method.getName()))
344 {
345 return method;
346 }
347 }
348 return null;
349 }
350
351 public static boolean verifyEmptyCurStateAndExtView(String clusterName,
352 String resourceName,
353 Set<String> instanceNames,
354 String zkAddr)
355 {
356 ZkClient zkClient = new ZkClient(zkAddr);
357 zkClient.setZkSerializer(new ZNRecordSerializer());
358
359 try
360 {
361 ZKHelixDataAccessor accessor =
362 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
363 Builder keyBuilder = accessor.keyBuilder();
364
365 for (String instanceName : instanceNames)
366 {
367 List<String> sessionIds =
368 accessor.getChildNames(keyBuilder.sessions(instanceName));
369
370 for (String sessionId : sessionIds)
371 {
372 CurrentState curState =
373 accessor.getProperty(keyBuilder.currentState(instanceName,
374 sessionId,
375 resourceName));
376
377 if (curState != null && curState.getRecord().getMapFields().size() != 0)
378 {
379 return false;
380 }
381 }
382
383 ExternalView extView =
384 accessor.getProperty(keyBuilder.externalView(resourceName));
385
386 if (extView != null && extView.getRecord().getMapFields().size() != 0)
387 {
388 return false;
389 }
390
391 }
392
393 return true;
394 }
395 finally
396 {
397 zkClient.close();
398 }
399 }
400
401 public static boolean verifyNotConnected(HelixManager manager)
402 {
403 return !manager.isConnected();
404 }
405
406 public static void setupCluster(String clusterName,
407 String zkAddr,
408 int startPort,
409 String participantNamePrefix,
410 String resourceNamePrefix,
411 int resourceNb,
412 int partitionNb,
413 int nodesNb,
414 int replica,
415 String stateModelDef,
416 boolean doRebalance) throws Exception
417 {
418 TestHelper.setupCluster(clusterName,
419 zkAddr,
420 startPort,
421 participantNamePrefix,
422 resourceNamePrefix,
423 resourceNb,
424 partitionNb,
425 nodesNb,
426 replica,
427 stateModelDef,
428 IdealStateModeProperty.AUTO,
429 doRebalance);
430 }
431
432 public static void setupCluster(String clusterName,
433 String ZkAddr,
434 int startPort,
435 String participantNamePrefix,
436 String resourceNamePrefix,
437 int resourceNb,
438 int partitionNb,
439 int nodesNb,
440 int replica,
441 String stateModelDef,
442 IdealStateModeProperty mode,
443 boolean doRebalance) throws Exception
444 {
445 ZkClient zkClient = new ZkClient(ZkAddr);
446 if (zkClient.exists("/" + clusterName))
447 {
448 LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
449 zkClient.deleteRecursive("/" + clusterName);
450 }
451
452 ClusterSetup setupTool = new ClusterSetup(ZkAddr);
453 setupTool.addCluster(clusterName, true);
454
455 for (int i = 0; i < nodesNb; i++)
456 {
457 int port = startPort + i;
458 setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port);
459 }
460
461 for (int i = 0; i < resourceNb; i++)
462 {
463 String resourceName = resourceNamePrefix + i;
464 setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, mode.toString());
465 if (doRebalance)
466 {
467 setupTool.rebalanceStorageCluster(clusterName, resourceName, replica);
468 }
469 }
470 zkClient.close();
471 }
472
473
474
475
476
477
478
479
480 public static void verifyState(String clusterName,
481 String zkAddr,
482 Map<String, Set<String>> stateMap,
483 String state)
484 {
485 ZkClient zkClient = new ZkClient(zkAddr);
486 zkClient.setZkSerializer(new ZNRecordSerializer());
487
488 try
489 {
490 ZKHelixDataAccessor accessor =
491 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
492 Builder keyBuilder = accessor.keyBuilder();
493
494 for (String resGroupPartitionKey : stateMap.keySet())
495 {
496 Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey);
497 String resGroup = retMap.get("RESOURCE");
498 String partitionKey = retMap.get("PARTITION");
499
500 ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup));
501 for (String instance : stateMap.get(resGroupPartitionKey))
502 {
503 String actualState = extView.getStateMap(partitionKey).get(instance);
504 Assert.assertNotNull(actualState, "externalView doesn't contain state for "
505 + resGroup + "/" + partitionKey + " on " + instance + " (expect " + state
506 + ")");
507
508 Assert.assertEquals(actualState, state, "externalView for " + resGroup + "/"
509 + partitionKey + " on " + instance + " is " + actualState + " (expect "
510 + state + ")");
511 }
512 }
513 }
514 finally
515 {
516 zkClient.close();
517 }
518 }
519
520
521
522
523
524
525
526
527 private static Map<String, String> getResourceAndPartitionKey(String resourcePartition)
528 {
529 String resourceName;
530 String partitionName;
531 int idx = resourcePartition.indexOf('/');
532 if (idx > -1)
533 {
534 resourceName = resourcePartition.substring(0, idx);
535 partitionName = resourcePartition.substring(idx + 1);
536 }
537 else
538 {
539 idx = resourcePartition.lastIndexOf('_');
540 resourceName = resourcePartition.substring(0, idx);
541 partitionName = resourcePartition;
542 }
543
544 Map<String, String> retMap = new HashMap<String, String>();
545 retMap.put("RESOURCE", resourceName);
546 retMap.put("PARTITION", partitionName);
547 return retMap;
548 }
549
550 public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads,
551 final Callable<T> method,
552 final long timeout)
553 {
554 final CountDownLatch startLatch = new CountDownLatch(1);
555 final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
556 final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
557 final List<Thread> threadList = new ArrayList<Thread>();
558
559 for (int i = 0; i < nrThreads; i++)
560 {
561 Thread thread = new Thread()
562 {
563 @Override
564 public void run()
565 {
566 try
567 {
568 boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
569 if (isTimeout)
570 {
571 LOG.error("Timeout while waiting for start latch");
572 }
573 }
574 catch (InterruptedException ex)
575 {
576 LOG.error("Interrupted while waiting for start latch");
577 }
578
579 try
580 {
581 T result = method.call();
582 if (result != null)
583 {
584 resultsMap.put("thread_" + this.getId(), result);
585 }
586 LOG.debug("result=" + result);
587 }
588 catch (Exception e)
589 {
590 LOG.error("Exeption in executing " + method.getClass().getName(), e);
591 }
592
593 finishCounter.countDown();
594 }
595 };
596 threadList.add(thread);
597 thread.start();
598 }
599 startLatch.countDown();
600
601
602 try
603 {
604 boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
605 if (isTimeout)
606 {
607 LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
608 for (Thread thread : threadList)
609 {
610 thread.interrupt();
611 }
612 }
613 }
614 catch (InterruptedException e)
615 {
616 LOG.error("Interrupted while waiting for finish latch", e);
617 }
618
619 return resultsMap;
620 }
621
622 public static Message createMessage(String msgId,
623 String fromState,
624 String toState,
625 String tgtName,
626 String resourceName,
627 String partitionName)
628 {
629 Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
630 msg.setFromState(fromState);
631 msg.setToState(toState);
632 msg.setTgtName(tgtName);
633 msg.setResourceName(resourceName);
634 msg.setPartitionName(partitionName);
635 msg.setStateModelDef("MasterSlave");
636
637 return msg;
638 }
639
640 public static String getTestMethodName()
641 {
642 StackTraceElement[] calls = Thread.currentThread().getStackTrace();
643 return calls[2].getMethodName();
644 }
645
646 public static String getTestClassName()
647 {
648 StackTraceElement[] calls = Thread.currentThread().getStackTrace();
649 String fullClassName = calls[2].getClassName();
650 return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
651 }
652
653 public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods,
654 final long timeout)
655 {
656 final int nrThreads = methods.size();
657 final CountDownLatch startLatch = new CountDownLatch(1);
658 final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
659 final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
660 final List<Thread> threadList = new ArrayList<Thread>();
661
662 for (int i = 0; i < nrThreads; i++)
663 {
664 final Callable<T> method = methods.get(i);
665
666 Thread thread = new Thread()
667 {
668 @Override
669 public void run()
670 {
671 try
672 {
673 boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
674 if (isTimeout)
675 {
676 LOG.error("Timeout while waiting for start latch");
677 }
678 }
679 catch (InterruptedException ex)
680 {
681 LOG.error("Interrupted while waiting for start latch");
682 }
683
684 try
685 {
686 T result = method.call();
687 if (result != null)
688 {
689 resultsMap.put("thread_" + this.getId(), result);
690 }
691 LOG.debug("result=" + result);
692 }
693 catch (Exception e)
694 {
695 LOG.error("Exeption in executing " + method.getClass().getName(), e);
696 }
697
698 finishCounter.countDown();
699 }
700 };
701 threadList.add(thread);
702 thread.start();
703 }
704 startLatch.countDown();
705
706
707 try
708 {
709 boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
710 if (isTimeout)
711 {
712 LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
713 for (Thread thread : threadList)
714 {
715 thread.interrupt();
716 }
717 }
718 }
719 catch (InterruptedException e)
720 {
721 LOG.error("Interrupted while waiting for finish latch", e);
722 }
723
724 return resultsMap;
725 }
726
727 public static void printCache(Map<String, ZNode> cache)
728 {
729 System.out.println("START:Print cache");
730 TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
731 map.putAll(cache);
732
733 for (String key : map.keySet())
734 {
735 ZNode node = map.get(key);
736 TreeSet<String> childSet = new TreeSet<String>();
737 childSet.addAll(node.getChildSet());
738 System.out.print(key + "=" + node.getData() + ", " + childSet + ", "
739 + (node.getStat() == null ? "null\n" : node.getStat()));
740 }
741 System.out.println("END:Print cache");
742 }
743
744 public static void readZkRecursive(String path,
745 Map<String, ZNode> map,
746 ZkClient zkclient)
747 {
748 try
749 {
750 Stat stat = new Stat();
751 ZNRecord record = zkclient.readData(path, stat);
752 List<String> childNames = zkclient.getChildren(path);
753 ZNode node = new ZNode(path, record, stat);
754 node.addChildren(childNames);
755 map.put(path, node);
756
757 for (String childName : childNames)
758 {
759 String childPath = path + "/" + childName;
760 readZkRecursive(childPath, map, zkclient);
761 }
762 }
763 catch (ZkNoNodeException e)
764 {
765
766 }
767 }
768
769 public static void readZkRecursive(String path,
770 Map<String, ZNode> map,
771 BaseDataAccessor<ZNRecord> zkAccessor)
772 {
773 try
774 {
775 Stat stat = new Stat();
776 ZNRecord record = zkAccessor.get(path, stat, 0);
777 List<String> childNames = zkAccessor.getChildNames(path, 0);
778
779 ZNode node = new ZNode(path, record, stat);
780 node.addChildren(childNames);
781 map.put(path, node);
782
783 if (childNames != null && !childNames.isEmpty())
784 {
785 for (String childName : childNames)
786 {
787 String childPath = path + "/" + childName;
788 readZkRecursive(childPath, map, zkAccessor);
789 }
790 }
791 }
792 catch (ZkNoNodeException e)
793 {
794
795 }
796 }
797
798 public static boolean verifyZkCache(List<String> paths,
799 BaseDataAccessor<ZNRecord> zkAccessor,
800 ZkClient zkclient,
801 boolean needVerifyStat)
802 {
803
804 Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
805 Map<String, ZNode> cache = new HashMap<String, ZNode>();
806 for (String path : paths)
807 {
808 readZkRecursive(path, zkMap, zkclient);
809 readZkRecursive(path, cache, zkAccessor);
810 }
811
812
813 return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
814 }
815
816 public static boolean verifyZkCache(List<String> paths,
817 Map<String, ZNode> cache,
818 ZkClient zkclient,
819 boolean needVerifyStat)
820 {
821 return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
822 }
823
824 public static boolean verifyZkCache(List<String> paths,
825 List<String> pathsExcludeForStat,
826 Map<String, ZNode> cache,
827 ZkClient zkclient,
828 boolean needVerifyStat)
829 {
830
831 Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
832 for (String path : paths)
833 {
834 readZkRecursive(path, zkMap, zkclient);
835 }
836
837
838 return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat);
839 }
840
841 public static boolean verifyZkCache(List<String> paths,
842 List<String> pathsExcludeForStat,
843 Map<String, ZNode> cache,
844 Map<String, ZNode> zkMap,
845 boolean needVerifyStat)
846 {
847
848 if (zkMap.size() != cache.size())
849 {
850 System.err.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: "
851 + zkMap.size());
852 System.out.println("cache: (" + cache.size() + ")");
853 TestHelper.printCache(cache);
854
855 System.out.println("zkMap: (" + zkMap.size() + ")");
856 TestHelper.printCache(zkMap);
857
858 return false;
859 }
860
861
862 for (String path : cache.keySet())
863 {
864 ZNode cacheNode = cache.get(path);
865 ZNode zkNode = zkMap.get(path);
866
867 if (zkNode == null)
868 {
869
870 System.err.println("path: " + path + " in cache but not on zk: inCacheNode: "
871 + cacheNode);
872 return false;
873 }
874
875 if ((zkNode.getData() == null && cacheNode.getData() != null)
876 || (zkNode.getData() != null && cacheNode.getData() == null)
877 || (zkNode.getData() != null && cacheNode.getData() != null && !zkNode.getData()
878 .equals(cacheNode.getData())))
879 {
880
881 System.err.println("data mismatch on path: " + path + ", inCache: "
882 + cacheNode.getData() + ", onZk: " + zkNode.getData());
883 return false;
884 }
885
886 if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null)
887 || (zkNode.getChildSet() != null && cacheNode.getChildSet() == null)
888 || (zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet()
889 .equals(cacheNode.getChildSet())))
890 {
891
892 System.err.println("childSet mismatch on path: " + path + ", inCache: "
893 + cacheNode.getChildSet() + ", onZk: " + zkNode.getChildSet());
894 return false;
895 }
896
897 if (needVerifyStat && pathsExcludeForStat != null
898 && !pathsExcludeForStat.contains(path))
899 {
900 if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat()))
901 {
902
903 System.err.println("Stat mismatch on path: " + path + ", inCache: "
904 + cacheNode.getStat() + ", onZk: " + zkNode.getStat());
905 return false;
906 }
907 }
908 }
909
910 return true;
911 }
912
913 public static StateModelDefinition generateStateModelDefForBootstrap()
914 {
915 ZNRecord record = new ZNRecord("Bootstrap");
916 record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE");
917 List<String> statePriorityList = new ArrayList<String>();
918 statePriorityList.add("ONLINE");
919 statePriorityList.add("BOOTSTRAP");
920 statePriorityList.add("OFFLINE");
921 statePriorityList.add("IDLE");
922 statePriorityList.add("DROPPED");
923 statePriorityList.add("ERROR");
924 record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
925 statePriorityList);
926 for (String state : statePriorityList)
927 {
928 String key = state + ".meta";
929 Map<String, String> metadata = new HashMap<String, String>();
930 if (state.equals("ONLINE"))
931 {
932 metadata.put("count", "R");
933 record.setMapField(key, metadata);
934 }
935 else if (state.equals("BOOTSTRAP"))
936 {
937 metadata.put("count", "-1");
938 record.setMapField(key, metadata);
939 }
940 else if (state.equals("OFFLINE"))
941 {
942 metadata.put("count", "-1");
943 record.setMapField(key, metadata);
944 }
945 else if (state.equals("IDLE"))
946 {
947 metadata.put("count", "-1");
948 record.setMapField(key, metadata);
949 }
950 else if (state.equals("DROPPED"))
951 {
952 metadata.put("count", "-1");
953 record.setMapField(key, metadata);
954 }
955 else if (state.equals("ERROR"))
956 {
957 metadata.put("count", "-1");
958 record.setMapField(key, metadata);
959 }
960 }
961
962 for (String state : statePriorityList)
963 {
964 String key = state + ".next";
965 if (state.equals("ONLINE"))
966 {
967 Map<String, String> metadata = new HashMap<String, String>();
968 metadata.put("BOOTSTRAP", "OFFLINE");
969 metadata.put("OFFLINE", "OFFLINE");
970 metadata.put("DROPPED", "OFFLINE");
971 metadata.put("IDLE", "OFFLINE");
972 record.setMapField(key, metadata);
973 }
974 else if (state.equals("BOOTSTRAP"))
975 {
976 Map<String, String> metadata = new HashMap<String, String>();
977 metadata.put("ONLINE", "ONLINE");
978 metadata.put("OFFLINE", "OFFLINE");
979 metadata.put("DROPPED", "OFFLINE");
980 metadata.put("IDLE", "OFFLINE");
981 record.setMapField(key, metadata);
982 }
983 else if (state.equals("OFFLINE"))
984 {
985 Map<String, String> metadata = new HashMap<String, String>();
986 metadata.put("ONLINE", "BOOTSTRAP");
987 metadata.put("BOOTSTRAP", "BOOTSTRAP");
988 metadata.put("DROPPED", "IDLE");
989 metadata.put("IDLE", "IDLE");
990 record.setMapField(key, metadata);
991 }
992 else if (state.equals("IDLE"))
993 {
994 Map<String, String> metadata = new HashMap<String, String>();
995 metadata.put("ONLINE", "OFFLINE");
996 metadata.put("BOOTSTRAP", "OFFLINE");
997 metadata.put("OFFLINE", "OFFLINE");
998 metadata.put("DROPPED", "DROPPED");
999 record.setMapField(key, metadata);
1000 }
1001 else if (state.equals("ERROR"))
1002 {
1003 Map<String, String> metadata = new HashMap<String, String>();
1004 metadata.put("IDLE", "IDLE");
1005 record.setMapField(key, metadata);
1006 }
1007 }
1008 List<String> stateTransitionPriorityList = new ArrayList<String>();
1009 stateTransitionPriorityList.add("ONLINE-OFFLINE");
1010 stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
1011 stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
1012 stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
1013 stateTransitionPriorityList.add("OFFLINE-IDLE");
1014 stateTransitionPriorityList.add("IDLE-OFFLINE");
1015 stateTransitionPriorityList.add("IDLE-DROPPED");
1016 stateTransitionPriorityList.add("ERROR-IDLED");
1017 record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
1018 stateTransitionPriorityList);
1019 return new StateModelDefinition(record);
1020 }
1021
1022 public static String znrecordToString(ZNRecord record)
1023 {
1024 StringBuffer sb = new StringBuffer();
1025 sb.append(record.getId() + "\n");
1026 Map<String, String> simpleFields = record.getSimpleFields();
1027 if (simpleFields != null)
1028 {
1029 sb.append("simpleFields\n");
1030 for (String key : simpleFields.keySet())
1031 {
1032 sb.append(" " + key + "\t: " + simpleFields.get(key) + "\n");
1033 }
1034 }
1035
1036 Map<String, List<String>> listFields = record.getListFields();
1037 sb.append("listFields\n");
1038 for (String key : listFields.keySet())
1039 {
1040 List<String> list = listFields.get(key);
1041 sb.append(" " + key + "\t: ");
1042 for (String listValue : list)
1043 {
1044 sb.append(listValue + ", ");
1045 }
1046 sb.append("\n");
1047 }
1048
1049 Map<String, Map<String, String>> mapFields = record.getMapFields();
1050 sb.append("mapFields\n");
1051 for (String key : mapFields.keySet())
1052 {
1053 Map<String, String> map = mapFields.get(key);
1054 sb.append(" " + key + "\t: \n");
1055 for (String mapKey : map.keySet())
1056 {
1057 sb.append(" " + mapKey + "\t: " + map.get(mapKey) + "\n");
1058 }
1059 }
1060
1061 return sb.toString();
1062 }
1063
1064 public static interface Verifier {
1065 boolean verify() throws Exception;
1066 }
1067
1068 public static boolean verify(Verifier verifier, long timeout) throws Exception {
1069 long start = System.currentTimeMillis();
1070 do {
1071 boolean result = verifier.verify();
1072 if (result || (System.currentTimeMillis() - start) > timeout) {
1073 return result;
1074 }
1075 Thread.sleep(100);
1076 } while (true);
1077 }
1078 }