View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  
29  import org.I0Itec.zkclient.DataUpdater;
30  import org.I0Itec.zkclient.IZkChildListener;
31  import org.I0Itec.zkclient.IZkDataListener;
32  import org.I0Itec.zkclient.exception.ZkBadVersionException;
33  import org.I0Itec.zkclient.exception.ZkException;
34  import org.I0Itec.zkclient.exception.ZkNoNodeException;
35  import org.I0Itec.zkclient.exception.ZkNodeExistsException;
36  import org.apache.helix.AccessOption;
37  import org.apache.helix.BaseDataAccessor;
38  import org.apache.helix.ZNRecord;
39  import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
40  import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
41  import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
42  import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
43  import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
44  import org.apache.helix.store.zk.ZNode;
45  import org.apache.helix.util.HelixUtil;
46  import org.apache.log4j.Logger;
47  import org.apache.zookeeper.CreateMode;
48  import org.apache.zookeeper.KeeperException.Code;
49  import org.apache.zookeeper.data.Stat;
50  import org.apache.zookeeper.server.DataTree;
51  
52  
53  public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
54  {
55    enum RetCode
56    {
57      OK, NODE_EXISTS, ERROR
58    }
59  
60    private static Logger  LOG = Logger.getLogger(ZkBaseDataAccessor.class);
61  
62    private final ZkClient _zkClient;
63  
64    public ZkBaseDataAccessor(ZkClient zkClient)
65    {
66      _zkClient = zkClient;
67    }
68  
69    /**
70     * sync create
71     */
72    @Override
73    public boolean create(String path, T record, int options)
74    {
75      return create(path, record, null, options) == RetCode.OK;
76    }
77  
78    /**
79     * sync create
80     */
81    public RetCode create(String path, T record, List<String> pathCreated, int options)
82    {
83      CreateMode mode = AccessOption.getMode(options);
84      if (mode == null)
85      {
86        LOG.error("Invalid create mode. options: " + options);
87        return RetCode.ERROR;
88      }
89  
90      boolean retry;
91      do
92      {
93        retry = false;
94        try
95        {
96          _zkClient.create(path, record, mode);
97          if (pathCreated != null)
98            pathCreated.add(path);
99  
100         return RetCode.OK;
101       }
102       catch (ZkNoNodeException e)
103       {
104         // this will happen if parent node does not exist
105         String parentPath = HelixUtil.getZkParentPath(path);
106         try
107         {
108           RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
109           if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
110           {
111             // if parent node created/exists, retry
112             retry = true;
113           }
114         }
115         catch (Exception e1)
116         {
117           LOG.error("Exception while creating path: " + parentPath, e1);
118           return RetCode.ERROR;
119         }
120       }
121       catch (ZkNodeExistsException e)
122       {
123         LOG.warn("Node already exists. path: " + path);
124         return RetCode.NODE_EXISTS;
125       }
126       catch (Exception e)
127       {
128         LOG.error("Exception while creating path: " + path, e);
129         return RetCode.ERROR;
130       }
131     }
132     while (retry);
133 
134     return RetCode.OK;
135   }
136 
137   /**
138    * sync set
139    */
140   @Override
141   public boolean set(String path, T record, int options)
142   {
143     return set(path, record, null, null, -1, options);
144   }
145 
146   /**
147    * sync set
148    * 
149    * @param setstat
150    *          : if node is created instead of set, stat will NOT be set
151    */
152   public boolean set(String path,
153                      T record,
154                      List<String> pathsCreated,
155                      Stat setstat,
156                      int expectVersion,
157                      int options)
158   {
159     CreateMode mode = AccessOption.getMode(options);
160     if (mode == null)
161     {
162       LOG.error("Invalid set mode. options: " + options);
163       return false;
164     }
165 
166     boolean retry;
167     do
168     {
169       retry = false;
170       try
171       {
172         // _zkClient.writeData(path, record);
173         Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
174         if (setstat != null)
175           DataTree.copyStat(setStat, setstat);
176       }
177       catch (ZkNoNodeException e)
178       {
179         // node not exists, try create. in this case, stat will not be set
180         try
181         {
182           RetCode rc = create(path, record, pathsCreated, options);
183           // if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
184           // retry = true;
185           switch (rc)
186           {
187           case OK:
188             // not set stat if node is created (instead of set)
189             break;
190           case NODE_EXISTS:
191             retry = true;
192             break;
193           default:
194             LOG.error("Fail to set path by creating: " + path);
195             return false;
196           }
197         }
198         catch (Exception e1)
199         {
200           LOG.error("Exception while setting path by creating: " + path, e);
201           return false;
202         }
203       }
204       catch (ZkBadVersionException e)
205       {
206         throw e;
207       }
208       catch (Exception e)
209       {
210         LOG.error("Exception while setting path: " + path, e);
211         return false;
212       }
213     }
214     while (retry);
215 
216     return true;
217   }
218 
219   /**
220    * sync update
221    */
222   @Override
223   public boolean update(String path, DataUpdater<T> updater, int options)
224   {
225     return update(path, updater, null, null, options) != null;
226   }
227 
228   /**
229    * sync update
230    * 
231    * @return: updatedData on success, or null on fail
232    */
233   public T update(String path,
234                   DataUpdater<T> updater,
235                   List<String> createPaths,
236                   Stat stat,
237                   int options)
238   {
239     CreateMode mode = AccessOption.getMode(options);
240     if (mode == null)
241     {
242       LOG.error("Invalid update mode. options: " + options);
243       return null;
244     }
245 
246     boolean retry;
247     T updatedData = null;
248     do
249     {
250       retry = false;
251       try
252       {
253         Stat readStat = new Stat();
254         T oldData = (T) _zkClient.readData(path, readStat);
255         T newData = updater.update(oldData);
256         Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
257         if (stat != null)
258         {
259           DataTree.copyStat(setStat, stat);
260         }
261 
262         updatedData = newData;
263       }
264       catch (ZkBadVersionException e)
265       {
266         retry = true;
267       }
268       catch (ZkNoNodeException e)
269       {
270         // node not exist, try create
271         try
272         {
273           T newData = updater.update(null);
274           RetCode rc = create(path, newData, createPaths, options);
275           switch (rc)
276           {
277           case OK:
278             updatedData = newData;
279             break;
280           case NODE_EXISTS:
281             retry = true;
282             break;
283           default:
284             LOG.error("Fail to update path by creating: " + path);
285             return null;
286           }
287         }
288         catch (Exception e1)
289         {
290           LOG.error("Exception while updating path by creating: " + path, e1);
291           return null;
292         }
293       }
294       catch (Exception e)
295       {
296         LOG.error("Exception while updating path: " + path, e);
297         return null;
298       }
299     }
300     while (retry);
301 
302     return updatedData;
303   }
304 
305   /**
306    * sync get
307    * 
308    */
309   @Override
310   public T get(String path, Stat stat, int options)
311   {
312     T data = null;
313     try
314     {
315       data = (T) _zkClient.readData(path, stat);
316     }
317     catch (ZkNoNodeException e)
318     {
319       if (AccessOption.isThrowExceptionIfNotExist(options))
320       {
321         throw e;
322       }
323     }
324     return data;
325   }
326 
327   /**
328    * async get
329    * 
330    */
331   @Override
332   public List<T> get(List<String> paths, List<Stat> stats, int options)
333   {
334     boolean[] needRead = new boolean[paths.size()];
335     Arrays.fill(needRead, true);
336 
337     return get(paths, stats, needRead);
338   }
339 
340   /**
341    * async get
342    */
343   List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
344   {
345     if (paths == null || paths.size() == 0)
346     {
347       return Collections.emptyList();
348     }
349 
350     // init stats
351     if (stats != null)
352     {
353       stats.clear();
354       stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
355     }
356 
357     long startT = System.nanoTime();
358 
359     try
360     {
361       // issue asyn get requests
362       GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
363       for (int i = 0; i < paths.size(); i++)
364       {
365         if (!needRead[i])
366           continue;
367 
368         String path = paths.get(i);
369         cbList[i] = new GetDataCallbackHandler();
370         _zkClient.asyncGetData(path, cbList[i]);
371       }
372 
373       // wait for completion
374       for (int i = 0; i < cbList.length; i++)
375       {
376         if (!needRead[i])
377           continue;
378 
379         GetDataCallbackHandler cb = cbList[i];
380         cb.waitForSuccess();
381       }
382 
383       // construct return results
384       List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
385 
386       for (int i = 0; i < paths.size(); i++)
387       {
388         if (!needRead[i])
389           continue;
390 
391         GetDataCallbackHandler cb = cbList[i];
392         if (Code.get(cb.getRc()) == Code.OK)
393         {
394           @SuppressWarnings("unchecked")
395           T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
396           records.set(i, record);
397           if (stats != null)
398           {
399             stats.set(i, cb._stat);
400           }
401         }
402       }
403 
404       return records;
405     }
406     finally
407     {
408       long endT = System.nanoTime();
409       if (LOG.isTraceEnabled())
410       {
411         LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
412           + ",... time: " + (endT - startT) + " ns");
413       }
414     }
415   }
416 
417   /**
418    * asyn getChildren
419    * 
420    */
421   @Override
422   public List<T> getChildren(String parentPath, List<Stat> stats, int options)
423   {
424     try
425     {
426       // prepare child paths
427       List<String> childNames = getChildNames(parentPath, options);
428       if (childNames == null || childNames.size() == 0)
429       {
430         return Collections.emptyList();
431       }
432 
433       List<String> paths = new ArrayList<String>();
434       for (String childName : childNames)
435       {
436         String path = parentPath + "/" + childName;
437         paths.add(path);
438       }
439 
440       // remove null record
441       List<Stat> curStats = new ArrayList<Stat>(paths.size());
442       List<T> records = get(paths, curStats, options);
443       Iterator<T> recordIter = records.iterator();
444       Iterator<Stat> statIter = curStats.iterator();
445       while (statIter.hasNext())
446       {
447         recordIter.next();
448         if (statIter.next() == null)
449         {
450           statIter.remove();
451           recordIter.remove();
452         }
453       }
454 
455       if (stats != null)
456       {
457         stats.clear();
458         stats.addAll(curStats);
459       }
460 
461       return records;
462     }
463     catch (ZkNoNodeException e)
464     {
465       return Collections.emptyList();
466     }
467   }
468 
469   /**
470    * sync getChildNames
471    * 
472    * @return null if parentPath doesn't exist
473    */
474   @Override
475   public List<String> getChildNames(String parentPath, int options)
476   {
477     try
478     {
479       List<String> childNames = _zkClient.getChildren(parentPath);
480       Collections.sort(childNames);
481       return childNames;
482     }
483     catch (ZkNoNodeException e)
484     {
485       return null;
486     }
487   }
488 
489   /**
490    * sync exists
491    * 
492    */
493   @Override
494   public boolean exists(String path, int options)
495   {
496     return _zkClient.exists(path);
497   }
498 
499   /**
500    * sync getStat
501    * 
502    */
503   @Override
504   public Stat getStat(String path, int options)
505   {
506     return _zkClient.getStat(path);
507   }
508 
509   /**
510    * sync remove
511    * 
512    */
513   @Override
514   public boolean remove(String path, int options)
515   {
516     try
517     {
518       // optimize on common path
519       _zkClient.delete(path);
520     }
521     catch (ZkException e)
522     {
523       _zkClient.deleteRecursive(path);
524     }
525     return true;
526   }
527 
528   /**
529    * async create. give up on error other than NONODE
530    * 
531    */
532   CreateCallbackHandler[] create(List<String> paths,
533                                  List<T> records,
534                                  boolean[] needCreate,
535                                  List<List<String>> pathsCreated,
536                                  int options)
537   {
538     if ((records != null && records.size() != paths.size())
539         || needCreate.length != paths.size()
540         || (pathsCreated != null && pathsCreated.size() != paths.size()))
541     {
542       throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
543     }
544 
545     CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
546 
547     CreateMode mode = AccessOption.getMode(options);
548     if (mode == null)
549     {
550       LOG.error("Invalid async set mode. options: " + options);
551       return cbList;
552     }
553 
554     boolean retry;
555     do
556     {
557       retry = false;
558 
559       for (int i = 0; i < paths.size(); i++)
560       {
561         if (!needCreate[i])
562           continue;
563 
564         String path = paths.get(i);
565         T record = records == null ? null : records.get(i);
566         cbList[i] = new CreateCallbackHandler();
567         _zkClient.asyncCreate(path, record, mode, cbList[i]);
568       }
569 
570       List<String> parentPaths =
571           new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
572       boolean failOnNoNode = false;
573 
574       for (int i = 0; i < paths.size(); i++)
575       {
576         if (!needCreate[i])
577           continue;
578 
579         CreateCallbackHandler cb = cbList[i];
580         cb.waitForSuccess();
581         String path = paths.get(i);
582 
583         if (Code.get(cb.getRc()) == Code.NONODE)
584         {
585           String parentPath = HelixUtil.getZkParentPath(path);
586           parentPaths.set(i, parentPath);
587           failOnNoNode = true;
588         }
589         else
590         {
591           // if create succeed or fail on error other than NONODE,
592           // give up
593           needCreate[i] = false;
594 
595           // if succeeds, record what paths we've created
596           if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
597           {
598             if (pathsCreated.get(i) == null)
599             {
600               pathsCreated.set(i, new ArrayList<String>());
601             }
602             pathsCreated.get(i).add(path);
603           }
604         }
605       }
606 
607       if (failOnNoNode)
608       {
609         boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
610 
611         CreateCallbackHandler[] parentCbList =
612             create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
613         for (int i = 0; i < parentCbList.length; i++)
614         {
615           CreateCallbackHandler parentCb = parentCbList[i];
616           if (parentCb == null)
617             continue;
618 
619           Code rc = Code.get(parentCb.getRc());
620 
621           // if parent is created, retry create child
622           if (rc == Code.OK || rc == Code.NODEEXISTS)
623           {
624             retry = true;
625             break;
626           }
627         }
628       }
629     }
630     while (retry);
631 
632     return cbList;
633   }
634 
635 
636   /**
637    * async create
638    * 
639    * TODO: rename to create
640    */
641   @Override
642   public boolean[] createChildren(List<String> paths, List<T> records, int options)
643   {
644     boolean[] success = new boolean[paths.size()];
645 
646     CreateMode mode = AccessOption.getMode(options);
647     if (mode == null)
648     {
649       LOG.error("Invalid async create mode. options: " + options);
650       return success;
651     }
652 
653     boolean[] needCreate = new boolean[paths.size()];
654     Arrays.fill(needCreate, true);
655     List<List<String>> pathsCreated =
656         new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
657 
658     long startT = System.nanoTime();
659     try
660     {
661 
662       CreateCallbackHandler[] cbList =
663           create(paths, records, needCreate, pathsCreated, options);
664 
665       for (int i = 0; i < cbList.length; i++)
666       {
667         CreateCallbackHandler cb = cbList[i];
668         success[i] = (Code.get(cb.getRc()) == Code.OK);
669       }
670 
671       return success;
672 
673     }
674     finally
675     {
676       long endT = System.nanoTime();
677       if (LOG.isTraceEnabled())
678       {
679         LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
680           + ",... time: " + (endT - startT) + " ns");
681       }
682     }
683   }
684 
685   /**
686    * async set
687    * 
688    * TODO: rename to set
689    * 
690    */
691   @Override
692   public boolean[] setChildren(List<String> paths, List<T> records, int options)
693   {
694     return set(paths, records, null, null, options);
695   }
696 
697   /**
698    * async set, give up on error other than NoNode
699    * 
700    */
701   boolean[] set(List<String> paths,
702                 List<T> records,
703                 List<List<String>> pathsCreated,
704                 List<Stat> stats,
705                 int options)
706   {
707     if (paths == null || paths.size() == 0)
708     {
709       return new boolean[0];
710     }
711 
712     if ((records != null && records.size() != paths.size())
713         || (pathsCreated != null && pathsCreated.size() != paths.size()))
714     {
715       throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
716     }
717 
718     boolean[] success = new boolean[paths.size()];
719 
720     CreateMode mode = AccessOption.getMode(options);
721     if (mode == null)
722     {
723       LOG.error("Invalid async set mode. options: " + options);
724       return success;
725     }
726 
727     List<Stat> setStats =
728         new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
729     SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
730     CreateCallbackHandler[] createCbList = null;
731     boolean[] needSet = new boolean[paths.size()];
732     Arrays.fill(needSet, true);
733 
734     long startT = System.nanoTime();
735 
736     try
737     {
738       boolean retry;
739       do
740       {
741         retry = false;
742 
743         for (int i = 0; i < paths.size(); i++)
744         {
745           if (!needSet[i])
746             continue;
747 
748           String path = paths.get(i);
749           T record = records.get(i);
750           cbList[i] = new SetDataCallbackHandler();
751           _zkClient.asyncSetData(path, record, -1, cbList[i]);
752 
753         }
754 
755         boolean failOnNoNode = false;
756 
757         for (int i = 0; i < cbList.length; i++)
758         {
759           SetDataCallbackHandler cb = cbList[i];
760           cb.waitForSuccess();
761           Code rc = Code.get(cb.getRc());
762           switch (rc)
763           {
764           case OK:
765             setStats.set(i, cb.getStat());
766             needSet[i] = false;
767             break;
768           case NONODE:
769             // if fail on NoNode, try create the node
770             failOnNoNode = true;
771             break;
772           default:
773             // if fail on error other than NoNode, give up
774             needSet[i] = false;
775             break;
776           }
777         }
778 
779         // if failOnNoNode, try create
780         if (failOnNoNode)
781         {
782           boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
783           createCbList = create(paths, records, needCreate, pathsCreated, options);
784           for (int i = 0; i < createCbList.length; i++)
785           {
786             CreateCallbackHandler createCb = createCbList[i];
787             if (createCb == null)
788             {
789               continue;
790             }
791 
792             Code rc = Code.get(createCb.getRc());
793             switch (rc)
794             {
795             case OK:
796               setStats.set(i, ZNode.ZERO_STAT);
797               needSet[i] = false;
798               break;
799             case NODEEXISTS:
800               retry = true;
801               break;
802             default:
803               // if creation fails on error other than NodeExists
804               // no need to retry set
805               needSet[i] = false;
806               break;
807             }
808           }
809         }
810       }
811       while (retry);
812 
813       // construct return results
814       for (int i = 0; i < cbList.length; i++)
815       {
816         SetDataCallbackHandler cb = cbList[i];
817 
818         Code rc = Code.get(cb.getRc());
819         if (rc == Code.OK)
820         {
821           success[i] = true;
822         }
823         else if (rc == Code.NONODE)
824         {
825           CreateCallbackHandler createCb = createCbList[i];
826           if (Code.get(createCb.getRc()) == Code.OK)
827           {
828             success[i] = true;
829           }
830         }
831       }
832 
833       if (stats != null)
834       {
835         stats.clear();
836         stats.addAll(setStats);
837       }
838 
839       return success;
840     }
841     finally
842     {
843       long endT = System.nanoTime();
844       if (LOG.isTraceEnabled())
845       {
846         LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
847           + ",... time: " + (endT - startT) + " ns");
848       }
849     }
850   }
851 
852   // TODO: rename to update
853   /**
854    * async update
855    */
856   @Override
857   public boolean[] updateChildren(List<String> paths,
858                                   List<DataUpdater<T>> updaters,
859                                   int options)
860   {
861 
862     List<T> updateData = update(paths, updaters, null, null, options);
863     boolean[] success = new boolean[paths.size()]; // init to false
864     for (int i = 0; i < paths.size(); i++)
865     {
866       T data = updateData.get(i);
867       success[i] = (data != null);
868     }
869     return success;
870   }
871 
872   /**
873    * async update
874    * 
875    * return: updatedData on success or null on fail
876    */
877   List<T> update(List<String> paths,
878                  List<DataUpdater<T>> updaters,
879                  List<List<String>> pathsCreated,
880                  List<Stat> stats,
881                  int options)
882   {
883     if (paths == null || paths.size() == 0)
884     {
885       LOG.error("paths is null or empty");
886       return Collections.emptyList();
887     }
888 
889     if (updaters.size() != paths.size()
890         || (pathsCreated != null && pathsCreated.size() != paths.size()))
891     {
892       throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
893     }
894 
895     List<Stat> setStats =
896         new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
897     List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
898 
899     CreateMode mode = AccessOption.getMode(options);
900     if (mode == null)
901     {
902       LOG.error("Invalid update mode. options: " + options);
903       return updateData;
904     }
905 
906     SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
907     CreateCallbackHandler[] createCbList = null;
908     boolean[] needUpdate = new boolean[paths.size()];
909     Arrays.fill(needUpdate, true);
910 
911     long startT = System.nanoTime();
912 
913     try
914     {
915       boolean retry;
916       do
917       {
918         retry = false;
919         boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
920         boolean failOnNoNode = false;
921 
922         // asycn read all data
923         List<Stat> curStats = new ArrayList<Stat>();
924         List<T> curDataList =
925             get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
926 
927         // async update
928         List<T> newDataList = new ArrayList<T>();
929         for (int i = 0; i < paths.size(); i++)
930         {
931           if (!needUpdate[i])
932           {
933             newDataList.add(null);
934             continue;
935           }
936           String path = paths.get(i);
937           DataUpdater<T> updater = updaters.get(i);
938           T newData = updater.update(curDataList.get(i));
939           newDataList.add(newData);
940           Stat curStat = curStats.get(i);
941           if (curStat == null)
942           {
943             // node not exists
944             failOnNoNode = true;
945             needCreate[i] = true;
946           }
947           else
948           {
949             cbList[i] = new SetDataCallbackHandler();
950             _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
951           }
952         }
953 
954         // wait for completion
955         boolean failOnBadVersion = false;
956 
957         for (int i = 0; i < paths.size(); i++)
958         {
959           SetDataCallbackHandler cb = cbList[i];
960           if (cb == null)
961             continue;
962 
963           cb.waitForSuccess();
964 
965           switch (Code.get(cb.getRc()))
966           {
967           case OK:
968             updateData.set(i, newDataList.get(i));
969             setStats.set(i, cb.getStat());
970             needUpdate[i] = false;
971             break;
972           case NONODE:
973             failOnNoNode = true;
974             needCreate[i] = true;
975             break;
976           case BADVERSION:
977             failOnBadVersion = true;
978             break;
979           default:
980             // if fail on error other than NoNode or BadVersion
981             // will not retry
982             needUpdate[i] = false;
983             break;
984           }
985         }
986 
987         // if failOnNoNode, try create
988         if (failOnNoNode)
989         {
990           createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
991           for (int i = 0; i < paths.size(); i++)
992           {
993             CreateCallbackHandler createCb = createCbList[i];
994             if (createCb == null)
995             {
996               continue;
997             }
998 
999             switch (Code.get(createCb.getRc()))
1000             {
1001             case OK:
1002               needUpdate[i] = false;
1003               updateData.set(i, newDataList.get(i));
1004               setStats.set(i, ZNode.ZERO_STAT);
1005               break;
1006             case NODEEXISTS:
1007               retry = true;
1008               break;
1009             default:
1010               // if fail on error other than NodeExists
1011               // will not retry
1012               needUpdate[i] = false;
1013               break;
1014             }
1015           }
1016         }
1017 
1018         // if failOnBadVersion, retry
1019         if (failOnBadVersion)
1020         {
1021           retry = true;
1022         }
1023       }
1024       while (retry);
1025 
1026       if (stats != null)
1027       {
1028         stats.clear();
1029         stats.addAll(setStats);
1030       }
1031 
1032       return updateData;
1033     }
1034     finally
1035     {
1036       long endT = System.nanoTime();
1037       if (LOG.isTraceEnabled())
1038       {
1039         LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
1040           + ",... time: " + (endT - startT) + " ns");
1041       }
1042     }
1043 
1044   }
1045 
1046   /**
1047    * async exists
1048    * 
1049    */
1050   @Override
1051   public boolean[] exists(List<String> paths, int options)
1052   {
1053     Stat[] stats = getStats(paths, options);
1054 
1055     boolean[] exists = new boolean[paths.size()];
1056     for (int i = 0; i < paths.size(); i++)
1057     {
1058       exists[i] = (stats[i] != null);
1059     }
1060 
1061     return exists;
1062   }
1063 
1064   /**
1065    * async getStat
1066    * 
1067    */
1068   @Override
1069   public Stat[] getStats(List<String> paths, int options)
1070   {
1071     if (paths == null || paths.size() == 0)
1072     {
1073       LOG.error("paths is null or empty");
1074       return new Stat[0];
1075     }
1076 
1077     Stat[] stats = new Stat[paths.size()];
1078 
1079     long startT = System.nanoTime();
1080 
1081     try
1082     {
1083       ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
1084       for (int i = 0; i < paths.size(); i++)
1085       {
1086         String path = paths.get(i);
1087         cbList[i] = new ExistsCallbackHandler();
1088         _zkClient.asyncExists(path, cbList[i]);
1089       }
1090 
1091       for (int i = 0; i < cbList.length; i++)
1092       {
1093         ExistsCallbackHandler cb = cbList[i];
1094         cb.waitForSuccess();
1095         stats[i] = cb._stat;
1096       }
1097 
1098       return stats;
1099     }
1100     finally
1101     {
1102       long endT = System.nanoTime();
1103       if (LOG.isTraceEnabled())
1104       {
1105         LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
1106           + ",... time: " + (endT - startT) + " ns");
1107       }
1108     }
1109   }
1110 
1111   /**
1112    * async remove
1113    * 
1114    */
1115   @Override
1116   public boolean[] remove(List<String> paths, int options)
1117   {
1118     if (paths == null || paths.size() == 0)
1119     {
1120       return new boolean[0];
1121     }
1122 
1123     boolean[] success = new boolean[paths.size()];
1124 
1125     DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
1126 
1127     long startT = System.nanoTime();
1128 
1129     try
1130     {
1131       for (int i = 0; i < paths.size(); i++)
1132       {
1133         String path = paths.get(i);
1134         cbList[i] = new DeleteCallbackHandler();
1135         _zkClient.asyncDelete(path, cbList[i]);
1136       }
1137 
1138       for (int i = 0; i < cbList.length; i++)
1139       {
1140         DeleteCallbackHandler cb = cbList[i];
1141         cb.waitForSuccess();
1142         success[i] = (cb.getRc() == 0);
1143       }
1144 
1145       return success;
1146     }
1147     finally
1148     {
1149       long endT = System.nanoTime();
1150       if (LOG.isTraceEnabled())
1151       {
1152         LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
1153           + ",... time: " + (endT - startT) + " ns");
1154       }
1155     }
1156   }
1157 
1158   /**
1159    * Subscribe to zookeeper data changes
1160    */
1161   @Override
1162   public void subscribeDataChanges(String path, IZkDataListener listener)
1163   {
1164     _zkClient.subscribeDataChanges(path, listener);
1165   }
1166 
1167   /**
1168    * Unsubscribe to zookeeper data changes
1169    */
1170   @Override
1171   public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
1172   {
1173     _zkClient.unsubscribeDataChanges(path, dataListener);
1174   }
1175 
1176   /**
1177    * Subscrie to zookeeper data changes
1178    */
1179   @Override
1180   public List<String> subscribeChildChanges(String path, IZkChildListener listener)
1181   {
1182     return _zkClient.subscribeChildChanges(path, listener);
1183   }
1184 
1185   /**
1186    * Unsubscrie to zookeeper data changes
1187    */
1188   @Override
1189   public void unsubscribeChildChanges(String path, IZkChildListener childListener)
1190   {
1191     _zkClient.unsubscribeChildChanges(path, childListener);
1192   }
1193 
1194   // simple test
1195   public static void main(String[] args)
1196   {
1197     ZkClient zkclient = new ZkClient("localhost:2191");
1198     zkclient.setZkSerializer(new ZNRecordSerializer());
1199     ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
1200 
1201     // test async create
1202     List<String> createPaths =
1203         Arrays.asList("/test/child1/child1", "/test/child2/child2");
1204     List<ZNRecord> createRecords =
1205         Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
1206 
1207     boolean[] needCreate = new boolean[createPaths.size()];
1208     Arrays.fill(needCreate, true);
1209     List<List<String>> pathsCreated =
1210         new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
1211                                                                        null));
1212     accessor.create(createPaths,
1213                     createRecords,
1214                     needCreate,
1215                     pathsCreated,
1216                     AccessOption.PERSISTENT);
1217     System.out.println("pathsCreated: " + pathsCreated);
1218 
1219     // test async set
1220     List<String> setPaths =
1221         Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
1222     List<ZNRecord> setRecords =
1223         Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
1224 
1225     pathsCreated =
1226         new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
1227                                                                        null));
1228     boolean[] success =
1229         accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
1230     System.out.println("pathsCreated: " + pathsCreated);
1231     System.out.println("setSuccess: " + Arrays.toString(success));
1232 
1233     // test async update
1234     List<String> updatePaths =
1235         Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
1236     class TestUpdater implements DataUpdater<ZNRecord>
1237     {
1238       final ZNRecord _newData;
1239 
1240       public TestUpdater(ZNRecord newData)
1241       {
1242         _newData = newData;
1243       }
1244 
1245       @Override
1246       public ZNRecord update(ZNRecord currentData)
1247       {
1248         return _newData;
1249 
1250       }
1251     }
1252     List<DataUpdater<ZNRecord>> updaters =
1253         Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
1254                       (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
1255 
1256     pathsCreated =
1257         new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
1258                                                                        null));
1259 
1260     List<ZNRecord> updateRecords =
1261         accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
1262     for (int i = 0; i < updatePaths.size(); i++)
1263     {
1264       success[i] = updateRecords.get(i) != null;
1265     }
1266     System.out.println("pathsCreated: " + pathsCreated);
1267     System.out.println("updateSuccess: " + Arrays.toString(success));
1268 
1269     System.out.println("CLOSING");
1270     zkclient.close();
1271   }
1272 
1273   /**
1274    * Reset
1275    */
1276   @Override
1277   public void reset()
1278   {
1279     // Nothing to do
1280   }
1281 }