View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.InputStreamReader;
26  import java.io.PrintWriter;
27  import java.lang.reflect.Method;
28  import java.net.ServerSocket;
29  import java.net.Socket;
30  import java.util.ArrayList;
31  import java.util.Arrays;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.TreeSet;
40  import java.util.concurrent.Callable;
41  import java.util.concurrent.ConcurrentHashMap;
42  import java.util.concurrent.CountDownLatch;
43  import java.util.concurrent.TimeUnit;
44  
45  import org.I0Itec.zkclient.IDefaultNameSpace;
46  import org.I0Itec.zkclient.ZkServer;
47  import org.I0Itec.zkclient.exception.ZkNoNodeException;
48  import org.apache.commons.io.FileUtils;
49  import org.apache.helix.PropertyKey.Builder;
50  import org.apache.helix.controller.HelixControllerMain;
51  import org.apache.helix.manager.zk.ZKHelixAdmin;
52  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
53  import org.apache.helix.manager.zk.ZNRecordSerializer;
54  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
55  import org.apache.helix.manager.zk.ZkClient;
56  import org.apache.helix.model.CurrentState;
57  import org.apache.helix.model.ExternalView;
58  import org.apache.helix.model.IdealState.IdealStateModeProperty;
59  import org.apache.helix.model.Message;
60  import org.apache.helix.model.Message.MessageType;
61  import org.apache.helix.model.StateModelDefinition;
62  import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
63  import org.apache.helix.participant.DistClusterControllerStateModelFactory;
64  import org.apache.helix.participant.StateMachineEngine;
65  import org.apache.helix.store.zk.ZNode;
66  import org.apache.helix.tools.ClusterSetup;
67  import org.apache.helix.util.ZKClientPool;
68  import org.apache.log4j.Logger;
69  import org.apache.zookeeper.data.Stat;
70  import org.testng.Assert;
71  
72  
73  public class TestHelper
74  {
75    private static final Logger LOG = Logger.getLogger(TestHelper.class);
76  
77    /**
78     * Returns a unused random port.
79     */
80    public static int getRandomPort() throws IOException {
81        ServerSocket sock = new ServerSocket();
82        sock.bind(null);
83        int port = sock.getLocalPort();
84        sock.close();
85  
86        return port;
87    }
88    
89    static public ZkServer startZkSever(final String zkAddress) throws Exception
90    {
91      List<String> empty = Collections.emptyList();
92      return TestHelper.startZkSever(zkAddress, empty, true);
93    }
94  
95    static public ZkServer startZkSever(final String zkAddress, final String rootNamespace) throws Exception
96    {
97      List<String> rootNamespaces = new ArrayList<String>();
98      rootNamespaces.add(rootNamespace);
99      return TestHelper.startZkSever(zkAddress, rootNamespaces, true);
100   }
101 
102   static public ZkServer startZkSever(final String zkAddress,
103       final List<String> rootNamespaces) throws Exception {
104     return startZkSever(zkAddress, rootNamespaces, true);
105   }
106   
107   static public ZkServer startZkSever(final String zkAddress,
108                                       final List<String> rootNamespaces, boolean overwrite) throws Exception
109   {
110     System.out.println("Start zookeeper at " + zkAddress + " in thread "
111         + Thread.currentThread().getName());
112 
113     String zkDir = zkAddress.replace(':', '_');
114     final String logDir = "/tmp/" + zkDir + "/logs";
115     final String dataDir = "/tmp/" + zkDir + "/dataDir";
116     if (overwrite) {
117       FileUtils.deleteDirectory(new File(dataDir));
118       FileUtils.deleteDirectory(new File(logDir));
119     }
120     ZKClientPool.reset();
121 
122     IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
123     {
124       @Override
125       public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient zkClient)
126       {
127         if (rootNamespaces == null) {
128           return;
129         }
130         
131         for (String rootNamespace : rootNamespaces)
132         {
133           try
134           {
135             zkClient.deleteRecursive(rootNamespace);
136           }
137           catch (Exception e)
138           {
139             LOG.error("fail to deleteRecursive path:" + rootNamespace + "\nexception:"
140                 + e);
141           }
142         }
143       }
144     };
145 
146     int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
147     ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
148     zkServer.start();
149 
150     return zkServer;
151   }
152 
153   static public void stopZkServer(ZkServer zkServer)
154   {
155     if (zkServer != null)
156     {
157       zkServer.shutdown();
158       System.out.println("Shut down zookeeper at port " + zkServer.getPort()
159           + " in thread " + Thread.currentThread().getName());
160     }
161   }
162 
163   public static StartCMResult startDummyProcess(final String zkAddr,
164                                                 final String clusterName,
165                                                 final String instanceName) throws Exception
166   {
167     StartCMResult result = new StartCMResult();
168     ZkHelixTestManager manager = null;
169     manager = new ZkHelixTestManager(clusterName,
170                                      instanceName,
171                                      InstanceType.PARTICIPANT,
172                                      zkAddr);
173     result._manager = manager;
174     Thread thread = new Thread(new DummyProcessThread(manager, instanceName));
175     result._thread = thread;
176     thread.start();
177 
178     return result;
179   }
180 
181   private static ZkHelixTestManager startHelixController(final String zkConnectString,
182 	      final String clusterName, final String controllerName, final String controllerMode)
183   {
184 	ZkHelixTestManager manager = null;
185     try
186     {
187       if (controllerMode.equalsIgnoreCase(HelixControllerMain.STANDALONE))
188       {
189         manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkConnectString);
190         manager.connect();
191       } else if (controllerMode.equalsIgnoreCase(HelixControllerMain.DISTRIBUTED))
192       {
193         manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
194 
195         DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
196             zkConnectString);
197 
198         StateMachineEngine stateMach = manager.getStateMachineEngine();
199         stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
200         manager.connect();
201       } else
202       {
203         LOG.error("cluster controller mode:" + controllerMode + " NOT supported");
204       }
205     } catch (Exception e)
206     {
207       // TODO Auto-generated catch block
208       e.printStackTrace();
209     }
210 
211     return manager;
212   }
213   
214   // TODO refactor this
215   public static StartCMResult startController(final String clusterName,
216                                               final String controllerName,
217                                               final String zkConnectString,
218                                               final String controllerMode) throws Exception
219   {
220     final StartCMResult result = new StartCMResult();
221     final ZkHelixTestManager manager = startHelixController(zkConnectString,
222                                                  	clusterName,
223                                                  	controllerName,
224                                                  	controllerMode);
225     result._manager = manager;
226 
227     Thread thread = new Thread(new Runnable()
228     {
229       @Override
230       public void run()
231       {
232         // ClusterManager manager = null;
233 
234         try
235         {
236 
237           Thread.currentThread().join();
238         }
239         catch (InterruptedException e)
240         {
241           String msg =
242               "controller:" + controllerName + ", " + Thread.currentThread().getName()
243                   + " interrupted";
244           LOG.info(msg);
245           // System.err.println(msg);
246         }
247         catch (Exception e)
248         {
249           e.printStackTrace();
250         }
251       }
252     });
253 
254     thread.start();
255     result._thread = thread;
256     return result;
257   }
258 
259   public static class StartCMResult
260   {
261     public Thread       _thread;
262     public ZkHelixTestManager _manager;
263 
264   }
265 
266   public static void setupEmptyCluster(ZkClient zkClient, String clusterName)
267   {
268     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
269     admin.addCluster(clusterName, true);
270   }
271 
272   /**
273    * convert T[] to set<T>
274    * 
275    * @param s
276    * @return
277    */
278   public static <T> Set<T> setOf(T... s)
279   {
280     Set<T> set = new HashSet<T>(Arrays.asList(s));
281     return set;
282   }
283 
284 //  public static void verifyWithTimeout(String verifierName, Object... args)
285 //  {
286 //    verifyWithTimeout(verifierName, 30 * 1000, args);
287 //  }
288 
289   /**
290    * generic method for verification with a timeout
291    * 
292    * @param verifierName
293    * @param args
294    */
295   public static void verifyWithTimeout(String verifierName, long timeout, Object... args)
296   {
297     final long sleepInterval = 1000; // in ms
298     final int loop = (int) (timeout / sleepInterval) + 1;
299     try
300     {
301       boolean result = false;
302       int i = 0;
303       for (; i < loop; i++)
304       {
305         Thread.sleep(sleepInterval);
306         // verifier should be static method
307         result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args);
308 
309         if (result == true)
310         {
311           break;
312         }
313       }
314 
315       // debug
316       // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify ("
317       // + result + ")");
318       System.err.println(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify "
319           + " (" + result + ")");
320       LOG.debug("args:" + Arrays.toString(args));
321       // System.err.println("args:" + Arrays.toString(args));
322 
323       if (result == false)
324       {
325         LOG.error(verifierName + " fails");
326         LOG.error("args:" + Arrays.toString(args));
327       }
328 
329       Assert.assertTrue(result);
330     }
331     catch (Exception e)
332     {
333       // TODO Auto-generated catch block
334       e.printStackTrace();
335     }
336   }
337 
338   private static Method getMethod(String name)
339   {
340     Method[] methods = TestHelper.class.getMethods();
341     for (Method method : methods)
342     {
343       if (name.equals(method.getName()))
344       {
345         return method;
346       }
347     }
348     return null;
349   }
350 
351   public static boolean verifyEmptyCurStateAndExtView(String clusterName,
352                                                       String resourceName,
353                                                       Set<String> instanceNames,
354                                                       String zkAddr)
355   {
356     ZkClient zkClient = new ZkClient(zkAddr);
357     zkClient.setZkSerializer(new ZNRecordSerializer());
358 
359     try
360     {
361       ZKHelixDataAccessor accessor =
362           new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
363       Builder keyBuilder = accessor.keyBuilder();
364 
365       for (String instanceName : instanceNames)
366       {
367         List<String> sessionIds =
368             accessor.getChildNames(keyBuilder.sessions(instanceName));
369 
370         for (String sessionId : sessionIds)
371         {
372           CurrentState curState =
373               accessor.getProperty(keyBuilder.currentState(instanceName,
374                                                            sessionId,
375                                                            resourceName));
376 
377           if (curState != null && curState.getRecord().getMapFields().size() != 0)
378           {
379             return false;
380           }
381         }
382 
383         ExternalView extView =
384             accessor.getProperty(keyBuilder.externalView(resourceName));
385 
386         if (extView != null && extView.getRecord().getMapFields().size() != 0)
387         {
388           return false;
389         }
390 
391       }
392 
393       return true;
394     }
395     finally
396     {
397       zkClient.close();
398     }
399   }
400 
401   public static boolean verifyNotConnected(HelixManager manager)
402   {
403     return !manager.isConnected();
404   }
405 
406   public static void setupCluster(String clusterName,
407                                   String zkAddr,
408                                   int startPort,
409                                   String participantNamePrefix,
410                                   String resourceNamePrefix,
411                                   int resourceNb,
412                                   int partitionNb,
413                                   int nodesNb,
414                                   int replica,
415                                   String stateModelDef,
416                                   boolean doRebalance) throws Exception
417   {
418     TestHelper.setupCluster(clusterName,
419                             zkAddr,
420                             startPort,
421                             participantNamePrefix,
422                             resourceNamePrefix,
423                             resourceNb,
424                             partitionNb,
425                             nodesNb,
426                             replica,
427                             stateModelDef,
428                             IdealStateModeProperty.AUTO,
429                             doRebalance);
430   }
431 
432   public static void setupCluster(String clusterName,
433                                   String ZkAddr,
434                                   int startPort,
435                                   String participantNamePrefix,
436                                   String resourceNamePrefix,
437                                   int resourceNb,
438                                   int partitionNb,
439                                   int nodesNb,
440                                   int replica,
441                                   String stateModelDef,
442                                   IdealStateModeProperty mode,
443                                   boolean doRebalance) throws Exception
444   {
445     ZkClient zkClient = new ZkClient(ZkAddr);
446     if (zkClient.exists("/" + clusterName))
447     {
448       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
449       zkClient.deleteRecursive("/" + clusterName);
450     }
451 
452     ClusterSetup setupTool = new ClusterSetup(ZkAddr);
453     setupTool.addCluster(clusterName, true);
454 
455     for (int i = 0; i < nodesNb; i++)
456     {
457       int port = startPort + i;
458       setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port);
459     }
460 
461     for (int i = 0; i < resourceNb; i++)
462     {
463       String resourceName = resourceNamePrefix + i;
464       setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, mode.toString());
465       if (doRebalance)
466       {
467         setupTool.rebalanceStorageCluster(clusterName, resourceName, replica);
468       }
469     }
470     zkClient.close();
471   }
472 
473   /**
474    * 
475    * @param stateMap
476    *          : "ResourceName/partitionKey" -> setOf(instances)
477    * @param state
478    *          : MASTER|SLAVE|ERROR...
479    */
480   public static void verifyState(String clusterName,
481                                  String zkAddr,
482                                  Map<String, Set<String>> stateMap,
483                                  String state)
484   {
485     ZkClient zkClient = new ZkClient(zkAddr);
486     zkClient.setZkSerializer(new ZNRecordSerializer());
487 
488     try
489     {
490       ZKHelixDataAccessor accessor =
491           new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
492       Builder keyBuilder = accessor.keyBuilder();
493 
494       for (String resGroupPartitionKey : stateMap.keySet())
495       {
496         Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey);
497         String resGroup = retMap.get("RESOURCE");
498         String partitionKey = retMap.get("PARTITION");
499 
500         ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup));
501         for (String instance : stateMap.get(resGroupPartitionKey))
502         {
503           String actualState = extView.getStateMap(partitionKey).get(instance);
504           Assert.assertNotNull(actualState, "externalView doesn't contain state for "
505               + resGroup + "/" + partitionKey + " on " + instance + " (expect " + state
506               + ")");
507 
508           Assert.assertEquals(actualState, state, "externalView for " + resGroup + "/"
509               + partitionKey + " on " + instance + " is " + actualState + " (expect "
510               + state + ")");
511         }
512       }
513     }
514     finally
515     {
516       zkClient.close();
517     }
518   }
519 
520   /**
521    * 
522    * @param resourcePartition
523    *          : key is in form of "resource/partitionKey" or "resource_x"
524    * 
525    * @return
526    */
527   private static Map<String, String> getResourceAndPartitionKey(String resourcePartition)
528   {
529     String resourceName;
530     String partitionName;
531     int idx = resourcePartition.indexOf('/');
532     if (idx > -1)
533     {
534       resourceName = resourcePartition.substring(0, idx);
535       partitionName = resourcePartition.substring(idx + 1);
536     }
537     else
538     {
539       idx = resourcePartition.lastIndexOf('_');
540       resourceName = resourcePartition.substring(0, idx);
541       partitionName = resourcePartition;
542     }
543 
544     Map<String, String> retMap = new HashMap<String, String>();
545     retMap.put("RESOURCE", resourceName);
546     retMap.put("PARTITION", partitionName);
547     return retMap;
548   }
549 
550   public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads,
551                                                             final Callable<T> method,
552                                                             final long timeout)
553   {
554     final CountDownLatch startLatch = new CountDownLatch(1);
555     final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
556     final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
557     final List<Thread> threadList = new ArrayList<Thread>();
558 
559     for (int i = 0; i < nrThreads; i++)
560     {
561       Thread thread = new Thread()
562       {
563         @Override
564         public void run()
565         {
566           try
567           {
568             boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
569             if (isTimeout)
570             {
571               LOG.error("Timeout while waiting for start latch");
572             }
573           }
574           catch (InterruptedException ex)
575           {
576             LOG.error("Interrupted while waiting for start latch");
577           }
578 
579           try
580           {
581             T result = method.call();
582             if (result != null)
583             {
584               resultsMap.put("thread_" + this.getId(), result);
585             }
586             LOG.debug("result=" + result);
587           }
588           catch (Exception e)
589           {
590             LOG.error("Exeption in executing " + method.getClass().getName(), e);
591           }
592 
593           finishCounter.countDown();
594         }
595       };
596       threadList.add(thread);
597       thread.start();
598     }
599     startLatch.countDown();
600 
601     // wait for all thread to complete
602     try
603     {
604       boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
605       if (isTimeout)
606       {
607         LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
608         for (Thread thread : threadList)
609         {
610           thread.interrupt();
611         }
612       }
613     }
614     catch (InterruptedException e)
615     {
616       LOG.error("Interrupted while waiting for finish latch", e);
617     }
618 
619     return resultsMap;
620   }
621 
622   public static Message createMessage(String msgId,
623                                       String fromState,
624                                       String toState,
625                                       String tgtName,
626                                       String resourceName,
627                                       String partitionName)
628   {
629     Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
630     msg.setFromState(fromState);
631     msg.setToState(toState);
632     msg.setTgtName(tgtName);
633     msg.setResourceName(resourceName);
634     msg.setPartitionName(partitionName);
635     msg.setStateModelDef("MasterSlave");
636 
637     return msg;
638   }
639 
640   public static String getTestMethodName()
641   {
642     StackTraceElement[] calls = Thread.currentThread().getStackTrace();
643     return calls[2].getMethodName();
644   }
645 
646   public static String getTestClassName()
647   {
648     StackTraceElement[] calls = Thread.currentThread().getStackTrace();
649     String fullClassName = calls[2].getClassName();
650     return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
651   }
652 
653   public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods,
654                                                             final long timeout)
655   {
656     final int nrThreads = methods.size();
657     final CountDownLatch startLatch = new CountDownLatch(1);
658     final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
659     final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
660     final List<Thread> threadList = new ArrayList<Thread>();
661 
662     for (int i = 0; i < nrThreads; i++)
663     {
664       final Callable<T> method = methods.get(i);
665 
666       Thread thread = new Thread()
667       {
668         @Override
669         public void run()
670         {
671           try
672           {
673             boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
674             if (isTimeout)
675             {
676               LOG.error("Timeout while waiting for start latch");
677             }
678           }
679           catch (InterruptedException ex)
680           {
681             LOG.error("Interrupted while waiting for start latch");
682           }
683 
684           try
685           {
686             T result = method.call();
687             if (result != null)
688             {
689               resultsMap.put("thread_" + this.getId(), result);
690             }
691             LOG.debug("result=" + result);
692           }
693           catch (Exception e)
694           {
695             LOG.error("Exeption in executing " + method.getClass().getName(), e);
696           }
697 
698           finishCounter.countDown();
699         }
700       };
701       threadList.add(thread);
702       thread.start();
703     }
704     startLatch.countDown();
705 
706     // wait for all thread to complete
707     try
708     {
709       boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
710       if (isTimeout)
711       {
712         LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
713         for (Thread thread : threadList)
714         {
715           thread.interrupt();
716         }
717       }
718     }
719     catch (InterruptedException e)
720     {
721       LOG.error("Interrupted while waiting for finish latch", e);
722     }
723 
724     return resultsMap;
725   }
726 
727   public static void printCache(Map<String, ZNode> cache)
728   {
729     System.out.println("START:Print cache");
730     TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
731     map.putAll(cache);
732 
733     for (String key : map.keySet())
734     {
735       ZNode node = map.get(key);
736       TreeSet<String> childSet = new TreeSet<String>();
737       childSet.addAll(node.getChildSet());
738       System.out.print(key + "=" + node.getData() + ", " + childSet + ", "
739           + (node.getStat() == null ? "null\n" : node.getStat()));
740     }
741     System.out.println("END:Print cache");
742   }
743 
744   public static void readZkRecursive(String path,
745                                      Map<String, ZNode> map,
746                                      ZkClient zkclient)
747   {
748     try
749     {
750       Stat stat = new Stat();
751       ZNRecord record = zkclient.readData(path, stat);
752       List<String> childNames = zkclient.getChildren(path);
753       ZNode node = new ZNode(path, record, stat);
754       node.addChildren(childNames);
755       map.put(path, node);
756 
757       for (String childName : childNames)
758       {
759         String childPath = path + "/" + childName;
760         readZkRecursive(childPath, map, zkclient);
761       }
762     }
763     catch (ZkNoNodeException e)
764     {
765       // OK
766     }
767   }
768 
769   public static void readZkRecursive(String path,
770                                      Map<String, ZNode> map,
771                                      BaseDataAccessor<ZNRecord> zkAccessor)
772   {
773     try
774     {
775       Stat stat = new Stat();
776       ZNRecord record = zkAccessor.get(path, stat, 0);
777       List<String> childNames = zkAccessor.getChildNames(path, 0);
778       // System.out.println("childNames: " + childNames);
779       ZNode node = new ZNode(path, record, stat);
780       node.addChildren(childNames);
781       map.put(path, node);
782 
783       if (childNames != null && !childNames.isEmpty())
784       {
785         for (String childName : childNames)
786         {
787           String childPath = path + "/" + childName;
788           readZkRecursive(childPath, map, zkAccessor);
789         }
790       }
791     }
792     catch (ZkNoNodeException e)
793     {
794       // OK
795     }
796   }
797 
798   public static boolean verifyZkCache(List<String> paths,
799                                       BaseDataAccessor<ZNRecord> zkAccessor,
800                                       ZkClient zkclient,
801                                       boolean needVerifyStat)
802   {
803     // read everything
804     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
805     Map<String, ZNode> cache = new HashMap<String, ZNode>();
806     for (String path : paths)
807     {
808       readZkRecursive(path, zkMap, zkclient);
809       readZkRecursive(path, cache, zkAccessor);
810     }
811     // printCache(map);
812 
813     return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
814   }
815 
816   public static boolean verifyZkCache(List<String> paths,
817                                       Map<String, ZNode> cache,
818                                       ZkClient zkclient,
819                                       boolean needVerifyStat)
820   {
821     return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
822   }
823 
824   public static boolean verifyZkCache(List<String> paths,
825                                       List<String> pathsExcludeForStat,
826                                       Map<String, ZNode> cache,
827                                       ZkClient zkclient,
828                                       boolean needVerifyStat)
829   {
830     // read everything on zk under paths
831     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
832     for (String path : paths)
833     {
834       readZkRecursive(path, zkMap, zkclient);
835     }
836     // printCache(map);
837 
838     return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat);
839   }
840 
841   public static boolean verifyZkCache(List<String> paths,
842                                       List<String> pathsExcludeForStat,
843                                       Map<String, ZNode> cache,
844                                       Map<String, ZNode> zkMap,
845                                       boolean needVerifyStat)
846   {
847     // equal size
848     if (zkMap.size() != cache.size())
849     {
850       System.err.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: "
851           + zkMap.size());
852       System.out.println("cache: (" + cache.size() + ")");
853       TestHelper.printCache(cache);
854 
855       System.out.println("zkMap: (" + zkMap.size() + ")");
856       TestHelper.printCache(zkMap);
857 
858       return false;
859     }
860 
861     // everything in cache is also in map
862     for (String path : cache.keySet())
863     {
864       ZNode cacheNode = cache.get(path);
865       ZNode zkNode = zkMap.get(path);
866 
867       if (zkNode == null)
868       {
869         // in cache but not on zk
870         System.err.println("path: " + path + " in cache but not on zk: inCacheNode: "
871             + cacheNode);
872         return false;
873       }
874 
875       if ((zkNode.getData() == null && cacheNode.getData() != null)
876           || (zkNode.getData() != null && cacheNode.getData() == null)
877           || (zkNode.getData() != null && cacheNode.getData() != null && !zkNode.getData()
878                                                                                 .equals(cacheNode.getData())))
879       {
880         // data not equal
881         System.err.println("data mismatch on path: " + path + ", inCache: "
882             + cacheNode.getData() + ", onZk: " + zkNode.getData());
883         return false;
884       }
885 
886       if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null)
887           || (zkNode.getChildSet() != null && cacheNode.getChildSet() == null)
888           || (zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet()
889                                                                                         .equals(cacheNode.getChildSet())))
890       {
891         // childSet not equal
892         System.err.println("childSet mismatch on path: " + path + ", inCache: "
893             + cacheNode.getChildSet() + ", onZk: " + zkNode.getChildSet());
894         return false;
895       }
896 
897       if (needVerifyStat && pathsExcludeForStat != null
898           && !pathsExcludeForStat.contains(path))
899       {
900         if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat()))
901         {
902           // stat not equal
903           System.err.println("Stat mismatch on path: " + path + ", inCache: "
904               + cacheNode.getStat() + ", onZk: " + zkNode.getStat());
905           return false;
906         }
907       }
908     }
909 
910     return true;
911   }
912 
913   public static StateModelDefinition generateStateModelDefForBootstrap()
914   {
915     ZNRecord record = new ZNRecord("Bootstrap");
916     record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE");
917     List<String> statePriorityList = new ArrayList<String>();
918     statePriorityList.add("ONLINE");
919     statePriorityList.add("BOOTSTRAP");
920     statePriorityList.add("OFFLINE");
921     statePriorityList.add("IDLE");
922     statePriorityList.add("DROPPED");
923     statePriorityList.add("ERROR");
924     record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
925                         statePriorityList);
926     for (String state : statePriorityList)
927     {
928       String key = state + ".meta";
929       Map<String, String> metadata = new HashMap<String, String>();
930       if (state.equals("ONLINE"))
931       {
932         metadata.put("count", "R");
933         record.setMapField(key, metadata);
934       }
935       else if (state.equals("BOOTSTRAP"))
936       {
937         metadata.put("count", "-1");
938         record.setMapField(key, metadata);
939       }
940       else if (state.equals("OFFLINE"))
941       {
942         metadata.put("count", "-1");
943         record.setMapField(key, metadata);
944       }
945       else if (state.equals("IDLE"))
946       {
947         metadata.put("count", "-1");
948         record.setMapField(key, metadata);
949       }
950       else if (state.equals("DROPPED"))
951       {
952         metadata.put("count", "-1");
953         record.setMapField(key, metadata);
954       }
955       else if (state.equals("ERROR"))
956       {
957         metadata.put("count", "-1");
958         record.setMapField(key, metadata);
959       }
960     }
961 
962     for (String state : statePriorityList)
963     {
964       String key = state + ".next";
965       if (state.equals("ONLINE"))
966       {
967         Map<String, String> metadata = new HashMap<String, String>();
968         metadata.put("BOOTSTRAP", "OFFLINE");
969         metadata.put("OFFLINE", "OFFLINE");
970         metadata.put("DROPPED", "OFFLINE");
971         metadata.put("IDLE", "OFFLINE");
972         record.setMapField(key, metadata);
973       }
974       else if (state.equals("BOOTSTRAP"))
975       {
976         Map<String, String> metadata = new HashMap<String, String>();
977         metadata.put("ONLINE", "ONLINE");
978         metadata.put("OFFLINE", "OFFLINE");
979         metadata.put("DROPPED", "OFFLINE");
980         metadata.put("IDLE", "OFFLINE");
981         record.setMapField(key, metadata);
982       }
983       else if (state.equals("OFFLINE"))
984       {
985         Map<String, String> metadata = new HashMap<String, String>();
986         metadata.put("ONLINE", "BOOTSTRAP");
987         metadata.put("BOOTSTRAP", "BOOTSTRAP");
988         metadata.put("DROPPED", "IDLE");
989         metadata.put("IDLE", "IDLE");
990         record.setMapField(key, metadata);
991       }
992       else if (state.equals("IDLE"))
993       {
994         Map<String, String> metadata = new HashMap<String, String>();
995         metadata.put("ONLINE", "OFFLINE");
996         metadata.put("BOOTSTRAP", "OFFLINE");
997         metadata.put("OFFLINE", "OFFLINE");
998         metadata.put("DROPPED", "DROPPED");
999         record.setMapField(key, metadata);
1000       }
1001       else if (state.equals("ERROR"))
1002       {
1003         Map<String, String> metadata = new HashMap<String, String>();
1004         metadata.put("IDLE", "IDLE");
1005         record.setMapField(key, metadata);
1006       }
1007     }
1008     List<String> stateTransitionPriorityList = new ArrayList<String>();
1009     stateTransitionPriorityList.add("ONLINE-OFFLINE");
1010     stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
1011     stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
1012     stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
1013     stateTransitionPriorityList.add("OFFLINE-IDLE");
1014     stateTransitionPriorityList.add("IDLE-OFFLINE");
1015     stateTransitionPriorityList.add("IDLE-DROPPED");
1016     stateTransitionPriorityList.add("ERROR-IDLED");
1017     record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
1018                         stateTransitionPriorityList);
1019     return new StateModelDefinition(record);
1020   }
1021 
1022   public static String znrecordToString(ZNRecord record)
1023   {
1024     StringBuffer sb = new StringBuffer();
1025     sb.append(record.getId() + "\n");
1026     Map<String, String> simpleFields = record.getSimpleFields();
1027     if (simpleFields != null)
1028     {
1029       sb.append("simpleFields\n");
1030       for (String key : simpleFields.keySet())
1031       {
1032         sb.append("  " + key + "\t: " + simpleFields.get(key) + "\n");
1033       }
1034     }
1035 
1036     Map<String, List<String>> listFields = record.getListFields();
1037     sb.append("listFields\n");
1038     for (String key : listFields.keySet())
1039     {
1040       List<String> list = listFields.get(key);
1041       sb.append("  " + key + "\t: ");
1042       for (String listValue : list)
1043       {
1044         sb.append(listValue + ", ");
1045       }
1046       sb.append("\n");
1047     }
1048 
1049     Map<String, Map<String, String>> mapFields = record.getMapFields();
1050     sb.append("mapFields\n");
1051     for (String key : mapFields.keySet())
1052     {
1053       Map<String, String> map = mapFields.get(key);
1054       sb.append("  " + key + "\t: \n");
1055       for (String mapKey : map.keySet())
1056       {
1057         sb.append("    " + mapKey + "\t: " + map.get(mapKey) + "\n");
1058       }
1059     }
1060 
1061     return sb.toString();
1062   }
1063   
1064   public static interface Verifier {
1065     boolean verify() throws Exception;
1066   }
1067   
1068   public static boolean verify(Verifier verifier, long timeout) throws Exception {
1069     long start = System.currentTimeMillis();
1070     do {
1071       boolean result = verifier.verify();
1072       if (result || (System.currentTimeMillis() - start) > timeout) {
1073         return result;
1074       }
1075       Thread.sleep(100);
1076     } while (true);
1077   }
1078 }