View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.CountDownLatch;
31  
32  import org.I0Itec.zkclient.exception.ZkBadVersionException;
33  import org.I0Itec.zkclient.exception.ZkNodeExistsException;
34  import org.apache.helix.HelixManager;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.manager.zk.ZNRecordSerializer;
37  import org.apache.helix.manager.zk.ZkClient;
38  import org.apache.helix.store.PropertyJsonComparator;
39  import org.apache.helix.store.PropertyJsonSerializer;
40  import org.apache.helix.store.PropertyStoreException;
41  import org.apache.helix.tools.TestCommand.CommandType;
42  import org.apache.log4j.Logger;
43  import org.apache.zookeeper.data.Stat;
44  
45  
46  /**
47   * a test is structured logically as a list of commands a command has three parts: COMMAND
48   * | TRIGGER | ARG'S COMMAND could be: modify, verify, start, stop
49   * 
50   * TRIGGER is optional and consists of start-time, timeout, and expect-value which means
51   * the COMMAND is triggered between [start-time, start-time + timeout] and is triggered
52   * when the value in concern equals to expect-value
53   * 
54   * ARG's format depends on COMMAND if COMMAND is modify/verify, arg is in form of:
55   * <znode-path, property-type (SIMPLE, LIST, or MAP), operation(+, -, ==, !=), key,
56   * update-value> in which key is k1 for SIMPLE, k1|index for LIST, and k1|k2 for MAP field
57   * if COMMAND is start/stop, arg is a thread handler
58   * 
59   * 
60   */
61  
62  public class TestExecutor
63  {
64    /**
65     * SIMPLE: simple field change LIST: list field change MAP: map field change ZNODE:
66     * entire znode change
67     */
68    public enum ZnodePropertyType
69    {
70      SIMPLE, LIST, MAP, ZNODE
71    }
72  
73    private enum ZnodeModValueType
74    {
75      INVALID, SINGLE_VALUE, LIST_VALUE, MAP_VALUE, ZNODE_VALUE
76    }
77  
78    private static Logger                                 logger              =
79                                                                                  Logger.getLogger(TestExecutor.class);
80    private static final long                             SLEEP_TIME          = 500;                                                   // in
81                                                                                                                                        // ms
82  
83    private final static PropertyJsonComparator<String>   STRING_COMPARATOR   =
84                                                                                  new PropertyJsonComparator<String>(String.class);
85    private final static PropertyJsonSerializer<ZNRecord> ZNRECORD_SERIALIZER =
86                                                                                  new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
87  
88    private static ZnodeModValueType getValueType(ZnodePropertyType type, String key)
89    {
90      ZnodeModValueType valueType = ZnodeModValueType.INVALID;
91      switch (type)
92      {
93      case SIMPLE:
94        if (key == null)
95        {
96          logger.warn("invalid key for simple field: key is null");
97        }
98        else
99        {
100         String keyParts[] = key.split("/");
101         if (keyParts.length != 1)
102         {
103           logger.warn("invalid key for simple field: " + key
104               + ", expect 1 part: key1 (no slash)");
105         }
106         else
107         {
108           valueType = ZnodeModValueType.SINGLE_VALUE;
109         }
110       }
111       break;
112     case LIST:
113       if (key == null)
114       {
115         logger.warn("invalid key for simple field: key is null");
116       }
117       else
118       {
119         String keyParts[] = key.split("/");
120         if (keyParts.length < 1 || keyParts.length > 2)
121         {
122           logger.warn("invalid key for list field: " + key
123               + ", expect 1 or 2 parts: key1 or key1/index)");
124         }
125         else if (keyParts.length == 1)
126         {
127           valueType = ZnodeModValueType.LIST_VALUE;
128         }
129         else
130         {
131           try
132           {
133             int index = Integer.parseInt(keyParts[1]);
134             if (index < 0)
135             {
136               logger.warn("invalid key for list field: " + key + ", index < 0");
137             }
138             else
139             {
140               valueType = ZnodeModValueType.SINGLE_VALUE;
141             }
142           }
143           catch (NumberFormatException e)
144           {
145             logger.warn("invalid key for list field: " + key
146                 + ", part-2 is NOT an integer");
147           }
148         }
149       }
150       break;
151     case MAP:
152       if (key == null)
153       {
154         logger.warn("invalid key for simple field: key is null");
155       }
156       else
157       {
158         String keyParts[] = key.split("/");
159         if (keyParts.length < 1 || keyParts.length > 2)
160         {
161           logger.warn("invalid key for map field: " + key
162               + ", expect 1 or 2 parts: key1 or key1/key2)");
163         }
164         else if (keyParts.length == 1)
165         {
166           valueType = ZnodeModValueType.MAP_VALUE;
167         }
168         else
169         {
170           valueType = ZnodeModValueType.SINGLE_VALUE;
171         }
172       }
173       break;
174     case ZNODE:
175       valueType = ZnodeModValueType.ZNODE_VALUE;
176     default:
177       break;
178     }
179     return valueType;
180   }
181 
182   private static String getSingleValue(ZNRecord record, ZnodePropertyType type, String key)
183   {
184     if (record == null || key == null)
185     {
186       return null;
187     }
188 
189     String value = null;
190     String keyParts[] = key.split("/");
191 
192     switch (type)
193     {
194     case SIMPLE:
195       value = record.getSimpleField(key);
196       break;
197     case LIST:
198       List<String> list = record.getListField(keyParts[0]);
199       if (list == null)
200       {
201         logger.warn("invalid key for list field: " + key
202             + ", map for key part-1 doesn't exist");
203         return null;
204       }
205       int idx = Integer.parseInt(keyParts[1]);
206       value = list.get(idx);
207       break;
208     case MAP:
209       Map<String, String> map = record.getMapField(keyParts[0]);
210       if (map == null)
211       {
212         logger.warn("invalid key for map field: " + key
213             + ", map for key part-1 doesn't exist");
214         return null;
215       }
216       value = map.get(keyParts[1]);
217       break;
218     default:
219       break;
220     }
221 
222     return value;
223   }
224 
225   private static List<String> getListValue(ZNRecord record, String key)
226   {
227     if (record == null)
228     {
229       return null;
230     }
231     return record.getListField(key);
232   }
233 
234   private static Map<String, String> getMapValue(ZNRecord record, String key)
235   {
236     return record.getMapField(key);
237   }
238 
239   // comparator's for single/list/map-value
240   private static boolean compareSingleValue(String actual,
241                                             String expect,
242                                             String key,
243                                             ZNRecord diff)
244   {
245     boolean ret = (STRING_COMPARATOR.compare(actual, expect) == 0);
246 
247     if (diff != null)
248     {
249       diff.setSimpleField(key + "/expect", expect);
250       diff.setSimpleField(key + "/actual", actual);
251     }
252     return ret;
253   }
254 
255   private static boolean compareListValue(List<String> actualList,
256                                           List<String> expectList,
257                                           String key,
258                                           ZNRecord diff)
259   {
260     boolean ret = true;
261     if (actualList == null && expectList == null)
262     {
263       ret = true;
264     }
265     else if (actualList == null && expectList != null)
266     {
267       ret = false;
268       if (diff != null)
269       {
270         diff.setListField(key + "/expect", expectList);
271       }
272     }
273     else if (actualList != null && expectList == null)
274     {
275       ret = false;
276       if (diff != null)
277       {
278         diff.setListField(key + "/actual", actualList);
279       }
280     }
281     else
282     {
283       Iterator<String> itrActual = actualList.iterator();
284       Iterator<String> itrExpect = expectList.iterator();
285       if (diff != null && diff.getListField(key + "/expect") == null)
286       {
287         diff.setListField(key + "/expect", new ArrayList<String>());
288       }
289 
290       if (diff != null && diff.getListField(key + "/actual") == null)
291       {
292         diff.setListField(key + "/actual", new ArrayList<String>());
293       }
294 
295       while (itrActual.hasNext() && itrExpect.hasNext())
296       {
297         String actual = itrActual.next();
298         String expect = itrExpect.next();
299 
300         if (STRING_COMPARATOR.compare(actual, expect) != 0)
301         {
302           ret = false;
303           if (diff != null)
304           {
305             diff.getListField(key + "/expect").add(expect);
306             diff.getListField(key + "/actual").add(actual);
307           }
308         }
309       }
310 
311       while (itrActual.hasNext())
312       {
313         String actual = itrActual.next();
314         if (diff != null)
315         {
316           diff.getListField(key + "/actual").add(actual);
317         }
318       }
319 
320       while (itrExpect.hasNext())
321       {
322         String expect = itrExpect.next();
323         if (diff != null)
324         {
325           diff.getListField(key + "/expect").add(expect);
326         }
327       }
328     }
329     return ret;
330   }
331 
332   private static void setMapField(ZNRecord record, String key1, String key2, String value)
333   {
334     if (record.getMapField(key1) == null)
335     {
336       record.setMapField(key1, new TreeMap<String, String>());
337     }
338     record.getMapField(key1).put(key2, value);
339   }
340 
341   private static boolean compareMapValue(Map<String, String> actualMap,
342                                          Map<String, String> expectMap,
343                                          String mapKey,
344                                          ZNRecord diff)
345   {
346     boolean ret = true;
347     if (actualMap == null && expectMap == null)
348     {
349       ret = true;
350     }
351     else if (actualMap == null && expectMap != null)
352     {
353       ret = false;
354       if (diff != null)
355       {
356         diff.setMapField(mapKey + "/expect", expectMap);
357       }
358     }
359     else if (actualMap != null && expectMap == null)
360     {
361       ret = false;
362       if (diff != null)
363       {
364         diff.setMapField(mapKey + "/actual", actualMap);
365       }
366 
367     }
368     else
369     {
370       for (String key : actualMap.keySet())
371       {
372         String actual = actualMap.get(key);
373         if (!expectMap.containsKey(key))
374         {
375           ret = false;
376 
377           if (diff != null)
378           {
379             setMapField(diff, mapKey + "/actual", key, actual);
380           }
381         }
382         else
383         {
384           String expect = expectMap.get(key);
385           if (STRING_COMPARATOR.compare(actual, expect) != 0)
386           {
387             ret = false;
388             if (diff != null)
389             {
390               setMapField(diff, mapKey + "/actual", key, actual);
391               setMapField(diff, mapKey + "/expect", key, expect);
392             }
393           }
394         }
395       }
396 
397       for (String key : expectMap.keySet())
398       {
399         String expect = expectMap.get(key);
400         if (!actualMap.containsKey(key))
401         {
402           ret = false;
403 
404           if (diff != null)
405           {
406             setMapField(diff, mapKey + "/expect", key, expect);
407           }
408         }
409         else
410         {
411           String actual = actualMap.get(key);
412           if (STRING_COMPARATOR.compare(actual, expect) != 0)
413           {
414             ret = false;
415             if (diff != null)
416             {
417               setMapField(diff, mapKey + "/actual", key, actual);
418               setMapField(diff, mapKey + "/expect", key, expect);
419             }
420           }
421         }
422       }
423     }
424     return ret;
425   }
426 
427   private static void setZNRecord(ZNRecord diff, ZNRecord record, String keySuffix)
428   {
429     if (diff == null || record == null)
430     {
431       return;
432     }
433 
434     for (String key : record.getSimpleFields().keySet())
435     {
436       diff.setSimpleField(key + "/" + keySuffix, record.getSimpleField(key));
437     }
438 
439     for (String key : record.getListFields().keySet())
440     {
441       diff.setListField(key + "/" + keySuffix, record.getListField(key));
442     }
443 
444     for (String key : record.getMapFields().keySet())
445     {
446       diff.setMapField(key + "/" + keySuffix, record.getMapField(key));
447     }
448   }
449 
450   private static boolean compareZnodeValue(ZNRecord actual, ZNRecord expect, ZNRecord diff)
451   {
452     boolean ret = true;
453     if (actual == null && expect == null)
454     {
455       ret = true;
456     }
457     else if (actual == null && expect != null)
458     {
459       ret = false;
460       if (diff != null)
461       {
462         setZNRecord(diff, expect, "expect");
463       }
464     }
465     else if (actual != null && expect == null)
466     {
467       ret = false;
468       if (diff != null)
469       {
470         setZNRecord(diff, actual, "actual");
471       }
472     }
473     else
474     {
475       for (String key : actual.getSimpleFields().keySet())
476       {
477         if (compareSingleValue(actual.getSimpleField(key),
478                                expect.getSimpleField(key),
479                                key,
480                                diff) == false)
481         {
482           ret = false;
483         }
484       }
485 
486       for (String key : expect.getMapFields().keySet())
487       {
488         if (!actual.getMapFields().containsKey(key))
489         {
490           if (diff != null)
491           {
492             ret = false;
493             diff.setMapField(key + "/expect", expect.getMapField(key));
494           }
495         }
496         else
497         {
498           if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
499           {
500             ret = false;
501           }
502         }
503       }
504 
505       for (String key : actual.getMapFields().keySet())
506       {
507         if (!expect.getMapFields().containsKey(key))
508         {
509           if (diff != null)
510           {
511             ret = false;
512             diff.setMapField(key + "/actual", actual.getMapField(key));
513           }
514         }
515         else
516         {
517           if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false)
518           {
519             ret = false;
520           }
521         }
522       }
523     }
524     return ret;
525   }
526 
527   private static void resetZNRecord(ZNRecord record)
528   {
529     if (record != null)
530     {
531       record.getSimpleFields().clear();
532       record.getListFields().clear();
533       record.getMapFields().clear();
534     }
535   }
536 
537   private static boolean isValueExpected(ZNRecord current,
538                                          ZnodePropertyType type,
539                                          String key,
540                                          ZnodeValue expect,
541                                          ZNRecord diff)
542   {
543     // expect value = null means not expect any value
544     if (expect == null)
545     {
546       return true;
547     }
548 
549     boolean result = false;
550     resetZNRecord(diff);
551     ZnodeModValueType valueType = getValueType(type, key);
552     switch (valueType)
553     {
554     case SINGLE_VALUE:
555       String singleValue = getSingleValue(current, type, key);
556       result = compareSingleValue(singleValue, expect._singleValue, key, diff);
557       break;
558     case LIST_VALUE:
559       List<String> listValue = getListValue(current, key);
560       result = compareListValue(listValue, expect._listValue, key, diff);
561       break;
562     case MAP_VALUE:
563       Map<String, String> mapValue = getMapValue(current, key);
564       result = compareMapValue(mapValue, expect._mapValue, key, diff);
565       break;
566     case ZNODE_VALUE:
567       result = compareZnodeValue(current, expect._znodeValue, diff);
568       break;
569     case INVALID:
570       break;
571     default:
572       break;
573     }
574     return result;
575   }
576 
577   private static void setSingleValue(ZNRecord record,
578                                      ZnodePropertyType type,
579                                      String key,
580                                      String value)
581   {
582     String keyParts[] = key.split("/");
583 
584     switch (type)
585     {
586     case SIMPLE:
587       record.setSimpleField(key, value);
588       break;
589     case LIST:
590       List<String> list = record.getListField(keyParts[0]);
591       if (list == null)
592       {
593         logger.warn("invalid key for list field: " + key
594             + ", value for key part-1 doesn't exist");
595         return;
596       }
597       int idx = Integer.parseInt(keyParts[1]);
598       list.remove(idx);
599       list.add(idx, value);
600       break;
601     case MAP:
602       Map<String, String> map = record.getMapField(keyParts[0]);
603       if (map == null)
604       {
605         logger.warn("invalid key for map field: " + key
606             + ", value for key part-1 doesn't exist");
607         return;
608       }
609       map.put(keyParts[1], value);
610       break;
611     default:
612       break;
613     }
614   }
615 
616   private static void setListValue(ZNRecord record, String key, List<String> value)
617   {
618     record.setListField(key, value);
619   }
620 
621   private static void setMapValue(ZNRecord record, String key, Map<String, String> value)
622   {
623     record.setMapField(key, value);
624   }
625 
626   private static void removeSingleValue(ZNRecord record,
627                                         ZnodePropertyType type,
628                                         String key)
629   {
630     if (record == null)
631     {
632       return;
633     }
634 
635     String keyParts[] = key.split("/");
636     switch (type)
637     {
638     case SIMPLE:
639       record.getSimpleFields().remove(key);
640       break;
641     case LIST:
642       List<String> list = record.getListField(keyParts[0]);
643       if (list == null)
644       {
645         logger.warn("invalid key for list field: " + key
646             + ", value for key part-1 doesn't exist");
647         return;
648       }
649       int idx = Integer.parseInt(keyParts[1]);
650       list.remove(idx);
651       break;
652     case MAP:
653       Map<String, String> map = record.getMapField(keyParts[0]);
654       if (map == null)
655       {
656         logger.warn("invalid key for map field: " + key
657             + ", value for key part-1 doesn't exist");
658         return;
659       }
660       map.remove(keyParts[1]);
661       break;
662     default:
663       break;
664     }
665   }
666 
667   private static void removeListValue(ZNRecord record, String key)
668   {
669     if (record == null || record.getListFields() == null)
670     {
671       record.getListFields().remove(key);
672     }
673   }
674 
675   private static void removeMapValue(ZNRecord record, String key)
676   {
677     record.getMapFields().remove(key);
678   }
679 
680   private static boolean executeVerifier(ZNRecord actual,
681                                          TestCommand command,
682                                          ZNRecord diff)
683   {
684     final ZnodeOpArg arg = command._znodeOpArg;
685     final ZnodeValue expectValue = command._trigger._expectValue;
686 
687     boolean result =
688         isValueExpected(actual, arg._propertyType, arg._key, expectValue, diff);
689     String operation = arg._operation;
690     if (operation.equals("!="))
691     {
692       result = !result;
693     }
694     else if (!operation.equals("=="))
695     {
696       logger.warn("fail to execute (unsupport operation=" + operation + "):" + operation);
697       result = false;
698     }
699 
700     return result;
701   }
702 
703   private static boolean compareAndSetZnode(ZnodeValue expect,
704                                             ZnodeOpArg arg,
705                                             ZkClient zkClient,
706                                             ZNRecord diff)
707   {
708     String path = arg._znodePath;
709     ZnodePropertyType type = arg._propertyType;
710     String key = arg._key;
711     boolean success = true;
712 
713     // retry 3 times in case there are write conflicts
714     long backoffTime = 20; // ms
715     for (int i = 0; i < 3; i++)
716     {
717       try
718       {
719         Stat stat = new Stat();
720         ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
721 
722         if (isValueExpected(record, type, key, expect, diff))
723         {
724           if (arg._operation.compareTo("+") == 0)
725           {
726             if (record == null)
727             {
728               record = new ZNRecord("default");
729             }
730             ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
731             switch (valueType)
732             {
733             case SINGLE_VALUE:
734               setSingleValue(record,
735                              arg._propertyType,
736                              arg._key,
737                              arg._updateValue._singleValue);
738               break;
739             case LIST_VALUE:
740               setListValue(record, arg._key, arg._updateValue._listValue);
741               break;
742             case MAP_VALUE:
743               setMapValue(record, arg._key, arg._updateValue._mapValue);
744               break;
745             case ZNODE_VALUE:
746               // deep copy
747               record =
748                   ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER.serialize(arg._updateValue._znodeValue));
749               break;
750             case INVALID:
751               break;
752             default:
753               break;
754             }
755           }
756           else if (arg._operation.compareTo("-") == 0)
757           {
758             ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
759             switch (valueType)
760             {
761             case SINGLE_VALUE:
762               removeSingleValue(record, arg._propertyType, arg._key);
763               break;
764             case LIST_VALUE:
765               removeListValue(record, arg._key);
766               break;
767             case MAP_VALUE:
768               removeMapValue(record, arg._key);
769               break;
770             case ZNODE_VALUE:
771               record = null;
772               break;
773             case INVALID:
774               break;
775             default:
776               break;
777             }
778           }
779           else
780           {
781             logger.warn("fail to execute (unsupport operation): " + arg._operation);
782             success = false;
783           }
784 
785           if (success == true)
786           {
787             if (record == null)
788             {
789               zkClient.delete(path);
790             }
791             else
792             {
793               try
794               {
795                 zkClient.createPersistent(path, true);
796               }
797               catch (ZkNodeExistsException e)
798               {
799                 // OK
800               }
801               zkClient.writeData(path, record, stat.getVersion());
802             }
803             return true;
804           }
805           else
806           {
807             return false;
808           }
809         }
810       }
811       catch (ZkBadVersionException e)
812       {
813         // e.printStackTrace();
814       }
815       catch (PropertyStoreException e)
816       {
817         // e.printStackTrace();
818       }
819 
820       try
821       {
822         Thread.sleep(backoffTime);
823       }
824       catch (InterruptedException e)
825       {
826         // TODO Auto-generated catch block
827         e.printStackTrace();
828       }
829       backoffTime *= 2;
830     }
831 
832     return false;
833   }
834 
835   private static class ExecuteCommand implements Runnable
836   {
837     private final TestCommand               _command;
838     private final long                      _startTime;
839     private final ZkClient                  _zkClient;
840     private final CountDownLatch            _countDown;
841     private final Map<TestCommand, Boolean> _testResults;
842 
843     public ExecuteCommand(long startTime,
844                           TestCommand command,
845                           CountDownLatch countDown,
846                           ZkClient zkClient,
847                           Map<TestCommand, Boolean> testResults)
848     {
849       _startTime = startTime;
850       _command = command;
851       _countDown = countDown;
852       _zkClient = zkClient;
853       _testResults = testResults;
854     }
855 
856     @Override
857     public void run()
858     {
859       boolean result = false;
860       long now = System.currentTimeMillis();
861       final long timeout = now + _command._trigger._timeout;
862       ZNRecord diff = new ZNRecord("diff");
863       try
864       {
865         if (now < _startTime)
866         {
867           Thread.sleep(_startTime - now);
868         }
869 
870         do
871         {
872           if (_command._commandType == CommandType.MODIFY)
873           {
874             ZnodeOpArg arg = _command._znodeOpArg;
875             final ZnodeValue expectValue = _command._trigger._expectValue;
876             result = compareAndSetZnode(expectValue, arg, _zkClient, diff);
877             // logger.error("result:" + result + ", " + _command);
878 
879             if (result == true)
880             {
881               _command._finishTimestamp = System.currentTimeMillis();
882               _testResults.put(_command, true);
883 
884               break;
885             }
886             else
887             {
888               // logger.error("result:" + result + ", diff:" + diff);
889             }
890           }
891           else if (_command._commandType == CommandType.VERIFY)
892           {
893             ZnodeOpArg arg = _command._znodeOpArg;
894             final String znodePath = arg._znodePath;
895             ZNRecord record = _zkClient.<ZNRecord> readData(znodePath, true);
896 
897             result = executeVerifier(record, _command, diff);
898             // logger.error("result:" + result + ", " + _command.toString());
899             if (result == true)
900             {
901               _command._finishTimestamp = System.currentTimeMillis();
902               _testResults.put(_command, true);
903               break;
904             }
905             else
906             {
907               // logger.error("result:" + result + ", diff:" + diff);
908             }
909           }
910           else if (_command._commandType == CommandType.START)
911           {
912             // TODO add data trigger for START command
913             Thread thread = _command._nodeOpArg._thread;
914             thread.start();
915 
916             result = true;
917             _command._finishTimestamp = System.currentTimeMillis();
918             logger.info("result:" + result + ", " + _command.toString());
919             _testResults.put(_command, true);
920             break;
921           }
922           else if (_command._commandType == CommandType.STOP)
923           {
924             // TODO add data trigger for STOP command
925             HelixManager manager = _command._nodeOpArg._manager;
926             manager.disconnect();
927             Thread thread = _command._nodeOpArg._thread;
928             thread.interrupt();
929 
930             // System.err.println("stop " +
931             // _command._nodeOpArg._manager.getInstanceName());
932             result = true;
933             _command._finishTimestamp = System.currentTimeMillis();
934             logger.info("result:" + result + ", " + _command.toString());
935             _testResults.put(_command, true);
936             break;
937           }
938           else
939           {
940             throw new IllegalArgumentException("Unsupport command type (was "
941                 + _command._commandType + ")");
942           }
943 
944           Thread.sleep(SLEEP_TIME);
945 
946           now = System.currentTimeMillis();
947         }
948         while (now <= timeout);
949       }
950       catch (Exception e)
951       {
952         // TODO Auto-generated catch block
953         e.printStackTrace();
954       }
955       finally
956       {
957         if (result == false)
958         {
959           _command._finishTimestamp = System.currentTimeMillis();
960           logger.error("result:" + result + ", diff: " + diff);
961         }
962         _countDown.countDown();
963         if (_countDown.getCount() == 0)
964         {
965           if (_zkClient != null && _zkClient.getConnection() != null)
966 
967           {
968             _zkClient.close();
969           }
970         }
971       }
972     }
973   }
974 
975   private static Map<TestCommand, Boolean> executeTestHelper(List<TestCommand> commandList,
976                                                              String zkAddr,
977                                                              CountDownLatch countDown)
978   {
979 
980     final Map<TestCommand, Boolean> testResults =
981         new ConcurrentHashMap<TestCommand, Boolean>();
982     ZkClient zkClient = null;
983 
984     zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
985     zkClient.setZkSerializer(new ZNRecordSerializer());
986 
987     // sort on trigger's start time, stable sort
988     Collections.sort(commandList, new Comparator<TestCommand>()
989     {
990       @Override
991       public int compare(TestCommand o1, TestCommand o2)
992       {
993         return (int) (o1._trigger._startTime - o2._trigger._startTime);
994       }
995     });
996 
997     for (TestCommand command : commandList)
998     {
999       testResults.put(command, new Boolean(false));
1000 
1001       TestTrigger trigger = command._trigger;
1002       command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
1003       new Thread(new ExecuteCommand(command._startTimestamp,
1004                                     command,
1005                                     countDown,
1006                                     zkClient,
1007                                     testResults)).start();
1008     }
1009 
1010     return testResults;
1011   }
1012 
1013   public static void executeTestAsync(List<TestCommand> commandList, String zkAddr) throws InterruptedException
1014   {
1015     CountDownLatch countDown = new CountDownLatch(commandList.size());
1016     executeTestHelper(commandList, zkAddr, countDown);
1017   }
1018 
1019   public static Map<TestCommand, Boolean> executeTest(List<TestCommand> commandList,
1020                                                       String zkAddr) throws InterruptedException
1021   {
1022     final CountDownLatch countDown = new CountDownLatch(commandList.size());
1023     Map<TestCommand, Boolean> testResults =
1024         executeTestHelper(commandList, zkAddr, countDown);
1025 
1026     // TODO add timeout
1027     countDown.await();
1028 
1029     return testResults;
1030   }
1031 
1032 }