1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.I0Itec.zkclient.DataUpdater;
30 import org.I0Itec.zkclient.IZkChildListener;
31 import org.I0Itec.zkclient.IZkDataListener;
32 import org.I0Itec.zkclient.exception.ZkBadVersionException;
33 import org.I0Itec.zkclient.exception.ZkException;
34 import org.I0Itec.zkclient.exception.ZkNoNodeException;
35 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
36 import org.apache.helix.AccessOption;
37 import org.apache.helix.BaseDataAccessor;
38 import org.apache.helix.ZNRecord;
39 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
40 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
41 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
42 import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
43 import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
44 import org.apache.helix.store.zk.ZNode;
45 import org.apache.helix.util.HelixUtil;
46 import org.apache.log4j.Logger;
47 import org.apache.zookeeper.CreateMode;
48 import org.apache.zookeeper.KeeperException.Code;
49 import org.apache.zookeeper.data.Stat;
50 import org.apache.zookeeper.server.DataTree;
51
52
53 public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
54 {
55 enum RetCode
56 {
57 OK, NODE_EXISTS, ERROR
58 }
59
60 private static Logger LOG = Logger.getLogger(ZkBaseDataAccessor.class);
61
62 private final ZkClient _zkClient;
63
64 public ZkBaseDataAccessor(ZkClient zkClient)
65 {
66 _zkClient = zkClient;
67 }
68
69
70
71
72 @Override
73 public boolean create(String path, T record, int options)
74 {
75 return create(path, record, null, options) == RetCode.OK;
76 }
77
78
79
80
81 public RetCode create(String path, T record, List<String> pathCreated, int options)
82 {
83 CreateMode mode = AccessOption.getMode(options);
84 if (mode == null)
85 {
86 LOG.error("Invalid create mode. options: " + options);
87 return RetCode.ERROR;
88 }
89
90 boolean retry;
91 do
92 {
93 retry = false;
94 try
95 {
96 _zkClient.create(path, record, mode);
97 if (pathCreated != null)
98 pathCreated.add(path);
99
100 return RetCode.OK;
101 }
102 catch (ZkNoNodeException e)
103 {
104
105 String parentPath = HelixUtil.getZkParentPath(path);
106 try
107 {
108 RetCode rc = create(parentPath, null, pathCreated, AccessOption.PERSISTENT);
109 if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS)
110 {
111
112 retry = true;
113 }
114 }
115 catch (Exception e1)
116 {
117 LOG.error("Exception while creating path: " + parentPath, e1);
118 return RetCode.ERROR;
119 }
120 }
121 catch (ZkNodeExistsException e)
122 {
123 LOG.warn("Node already exists. path: " + path);
124 return RetCode.NODE_EXISTS;
125 }
126 catch (Exception e)
127 {
128 LOG.error("Exception while creating path: " + path, e);
129 return RetCode.ERROR;
130 }
131 }
132 while (retry);
133
134 return RetCode.OK;
135 }
136
137
138
139
140 @Override
141 public boolean set(String path, T record, int options)
142 {
143 return set(path, record, null, null, -1, options);
144 }
145
146
147
148
149
150
151
152 public boolean set(String path,
153 T record,
154 List<String> pathsCreated,
155 Stat setstat,
156 int expectVersion,
157 int options)
158 {
159 CreateMode mode = AccessOption.getMode(options);
160 if (mode == null)
161 {
162 LOG.error("Invalid set mode. options: " + options);
163 return false;
164 }
165
166 boolean retry;
167 do
168 {
169 retry = false;
170 try
171 {
172
173 Stat setStat = _zkClient.writeDataGetStat(path, record, expectVersion);
174 if (setstat != null)
175 DataTree.copyStat(setStat, setstat);
176 }
177 catch (ZkNoNodeException e)
178 {
179
180 try
181 {
182 RetCode rc = create(path, record, pathsCreated, options);
183
184
185 switch (rc)
186 {
187 case OK:
188
189 break;
190 case NODE_EXISTS:
191 retry = true;
192 break;
193 default:
194 LOG.error("Fail to set path by creating: " + path);
195 return false;
196 }
197 }
198 catch (Exception e1)
199 {
200 LOG.error("Exception while setting path by creating: " + path, e);
201 return false;
202 }
203 }
204 catch (ZkBadVersionException e)
205 {
206 throw e;
207 }
208 catch (Exception e)
209 {
210 LOG.error("Exception while setting path: " + path, e);
211 return false;
212 }
213 }
214 while (retry);
215
216 return true;
217 }
218
219
220
221
222 @Override
223 public boolean update(String path, DataUpdater<T> updater, int options)
224 {
225 return update(path, updater, null, null, options) != null;
226 }
227
228
229
230
231
232
233 public T update(String path,
234 DataUpdater<T> updater,
235 List<String> createPaths,
236 Stat stat,
237 int options)
238 {
239 CreateMode mode = AccessOption.getMode(options);
240 if (mode == null)
241 {
242 LOG.error("Invalid update mode. options: " + options);
243 return null;
244 }
245
246 boolean retry;
247 T updatedData = null;
248 do
249 {
250 retry = false;
251 try
252 {
253 Stat readStat = new Stat();
254 T oldData = (T) _zkClient.readData(path, readStat);
255 T newData = updater.update(oldData);
256 Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
257 if (stat != null)
258 {
259 DataTree.copyStat(setStat, stat);
260 }
261
262 updatedData = newData;
263 }
264 catch (ZkBadVersionException e)
265 {
266 retry = true;
267 }
268 catch (ZkNoNodeException e)
269 {
270
271 try
272 {
273 T newData = updater.update(null);
274 RetCode rc = create(path, newData, createPaths, options);
275 switch (rc)
276 {
277 case OK:
278 updatedData = newData;
279 break;
280 case NODE_EXISTS:
281 retry = true;
282 break;
283 default:
284 LOG.error("Fail to update path by creating: " + path);
285 return null;
286 }
287 }
288 catch (Exception e1)
289 {
290 LOG.error("Exception while updating path by creating: " + path, e1);
291 return null;
292 }
293 }
294 catch (Exception e)
295 {
296 LOG.error("Exception while updating path: " + path, e);
297 return null;
298 }
299 }
300 while (retry);
301
302 return updatedData;
303 }
304
305
306
307
308
309 @Override
310 public T get(String path, Stat stat, int options)
311 {
312 T data = null;
313 try
314 {
315 data = (T) _zkClient.readData(path, stat);
316 }
317 catch (ZkNoNodeException e)
318 {
319 if (AccessOption.isThrowExceptionIfNotExist(options))
320 {
321 throw e;
322 }
323 }
324 return data;
325 }
326
327
328
329
330
331 @Override
332 public List<T> get(List<String> paths, List<Stat> stats, int options)
333 {
334 boolean[] needRead = new boolean[paths.size()];
335 Arrays.fill(needRead, true);
336
337 return get(paths, stats, needRead);
338 }
339
340
341
342
343 List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead)
344 {
345 if (paths == null || paths.size() == 0)
346 {
347 return Collections.emptyList();
348 }
349
350
351 if (stats != null)
352 {
353 stats.clear();
354 stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
355 }
356
357 long startT = System.nanoTime();
358
359 try
360 {
361
362 GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
363 for (int i = 0; i < paths.size(); i++)
364 {
365 if (!needRead[i])
366 continue;
367
368 String path = paths.get(i);
369 cbList[i] = new GetDataCallbackHandler();
370 _zkClient.asyncGetData(path, cbList[i]);
371 }
372
373
374 for (int i = 0; i < cbList.length; i++)
375 {
376 if (!needRead[i])
377 continue;
378
379 GetDataCallbackHandler cb = cbList[i];
380 cb.waitForSuccess();
381 }
382
383
384 List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
385
386 for (int i = 0; i < paths.size(); i++)
387 {
388 if (!needRead[i])
389 continue;
390
391 GetDataCallbackHandler cb = cbList[i];
392 if (Code.get(cb.getRc()) == Code.OK)
393 {
394 @SuppressWarnings("unchecked")
395 T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
396 records.set(i, record);
397 if (stats != null)
398 {
399 stats.set(i, cb._stat);
400 }
401 }
402 }
403
404 return records;
405 }
406 finally
407 {
408 long endT = System.nanoTime();
409 if (LOG.isTraceEnabled())
410 {
411 LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
412 + ",... time: " + (endT - startT) + " ns");
413 }
414 }
415 }
416
417
418
419
420
421 @Override
422 public List<T> getChildren(String parentPath, List<Stat> stats, int options)
423 {
424 try
425 {
426
427 List<String> childNames = getChildNames(parentPath, options);
428 if (childNames == null || childNames.size() == 0)
429 {
430 return Collections.emptyList();
431 }
432
433 List<String> paths = new ArrayList<String>();
434 for (String childName : childNames)
435 {
436 String path = parentPath + "/" + childName;
437 paths.add(path);
438 }
439
440
441 List<Stat> curStats = new ArrayList<Stat>(paths.size());
442 List<T> records = get(paths, curStats, options);
443 Iterator<T> recordIter = records.iterator();
444 Iterator<Stat> statIter = curStats.iterator();
445 while (statIter.hasNext())
446 {
447 recordIter.next();
448 if (statIter.next() == null)
449 {
450 statIter.remove();
451 recordIter.remove();
452 }
453 }
454
455 if (stats != null)
456 {
457 stats.clear();
458 stats.addAll(curStats);
459 }
460
461 return records;
462 }
463 catch (ZkNoNodeException e)
464 {
465 return Collections.emptyList();
466 }
467 }
468
469
470
471
472
473
474 @Override
475 public List<String> getChildNames(String parentPath, int options)
476 {
477 try
478 {
479 List<String> childNames = _zkClient.getChildren(parentPath);
480 Collections.sort(childNames);
481 return childNames;
482 }
483 catch (ZkNoNodeException e)
484 {
485 return null;
486 }
487 }
488
489
490
491
492
493 @Override
494 public boolean exists(String path, int options)
495 {
496 return _zkClient.exists(path);
497 }
498
499
500
501
502
503 @Override
504 public Stat getStat(String path, int options)
505 {
506 return _zkClient.getStat(path);
507 }
508
509
510
511
512
513 @Override
514 public boolean remove(String path, int options)
515 {
516 try
517 {
518
519 _zkClient.delete(path);
520 }
521 catch (ZkException e)
522 {
523 _zkClient.deleteRecursive(path);
524 }
525 return true;
526 }
527
528
529
530
531
532 CreateCallbackHandler[] create(List<String> paths,
533 List<T> records,
534 boolean[] needCreate,
535 List<List<String>> pathsCreated,
536 int options)
537 {
538 if ((records != null && records.size() != paths.size())
539 || needCreate.length != paths.size()
540 || (pathsCreated != null && pathsCreated.size() != paths.size()))
541 {
542 throw new IllegalArgumentException("paths, records, needCreate, and pathsCreated should be of same size");
543 }
544
545 CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
546
547 CreateMode mode = AccessOption.getMode(options);
548 if (mode == null)
549 {
550 LOG.error("Invalid async set mode. options: " + options);
551 return cbList;
552 }
553
554 boolean retry;
555 do
556 {
557 retry = false;
558
559 for (int i = 0; i < paths.size(); i++)
560 {
561 if (!needCreate[i])
562 continue;
563
564 String path = paths.get(i);
565 T record = records == null ? null : records.get(i);
566 cbList[i] = new CreateCallbackHandler();
567 _zkClient.asyncCreate(path, record, mode, cbList[i]);
568 }
569
570 List<String> parentPaths =
571 new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
572 boolean failOnNoNode = false;
573
574 for (int i = 0; i < paths.size(); i++)
575 {
576 if (!needCreate[i])
577 continue;
578
579 CreateCallbackHandler cb = cbList[i];
580 cb.waitForSuccess();
581 String path = paths.get(i);
582
583 if (Code.get(cb.getRc()) == Code.NONODE)
584 {
585 String parentPath = HelixUtil.getZkParentPath(path);
586 parentPaths.set(i, parentPath);
587 failOnNoNode = true;
588 }
589 else
590 {
591
592
593 needCreate[i] = false;
594
595
596 if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null)
597 {
598 if (pathsCreated.get(i) == null)
599 {
600 pathsCreated.set(i, new ArrayList<String>());
601 }
602 pathsCreated.get(i).add(path);
603 }
604 }
605 }
606
607 if (failOnNoNode)
608 {
609 boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
610
611 CreateCallbackHandler[] parentCbList =
612 create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
613 for (int i = 0; i < parentCbList.length; i++)
614 {
615 CreateCallbackHandler parentCb = parentCbList[i];
616 if (parentCb == null)
617 continue;
618
619 Code rc = Code.get(parentCb.getRc());
620
621
622 if (rc == Code.OK || rc == Code.NODEEXISTS)
623 {
624 retry = true;
625 break;
626 }
627 }
628 }
629 }
630 while (retry);
631
632 return cbList;
633 }
634
635
636
637
638
639
640
641 @Override
642 public boolean[] createChildren(List<String> paths, List<T> records, int options)
643 {
644 boolean[] success = new boolean[paths.size()];
645
646 CreateMode mode = AccessOption.getMode(options);
647 if (mode == null)
648 {
649 LOG.error("Invalid async create mode. options: " + options);
650 return success;
651 }
652
653 boolean[] needCreate = new boolean[paths.size()];
654 Arrays.fill(needCreate, true);
655 List<List<String>> pathsCreated =
656 new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
657
658 long startT = System.nanoTime();
659 try
660 {
661
662 CreateCallbackHandler[] cbList =
663 create(paths, records, needCreate, pathsCreated, options);
664
665 for (int i = 0; i < cbList.length; i++)
666 {
667 CreateCallbackHandler cb = cbList[i];
668 success[i] = (Code.get(cb.getRc()) == Code.OK);
669 }
670
671 return success;
672
673 }
674 finally
675 {
676 long endT = System.nanoTime();
677 if (LOG.isTraceEnabled())
678 {
679 LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
680 + ",... time: " + (endT - startT) + " ns");
681 }
682 }
683 }
684
685
686
687
688
689
690
691 @Override
692 public boolean[] setChildren(List<String> paths, List<T> records, int options)
693 {
694 return set(paths, records, null, null, options);
695 }
696
697
698
699
700
701 boolean[] set(List<String> paths,
702 List<T> records,
703 List<List<String>> pathsCreated,
704 List<Stat> stats,
705 int options)
706 {
707 if (paths == null || paths.size() == 0)
708 {
709 return new boolean[0];
710 }
711
712 if ((records != null && records.size() != paths.size())
713 || (pathsCreated != null && pathsCreated.size() != paths.size()))
714 {
715 throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
716 }
717
718 boolean[] success = new boolean[paths.size()];
719
720 CreateMode mode = AccessOption.getMode(options);
721 if (mode == null)
722 {
723 LOG.error("Invalid async set mode. options: " + options);
724 return success;
725 }
726
727 List<Stat> setStats =
728 new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
729 SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
730 CreateCallbackHandler[] createCbList = null;
731 boolean[] needSet = new boolean[paths.size()];
732 Arrays.fill(needSet, true);
733
734 long startT = System.nanoTime();
735
736 try
737 {
738 boolean retry;
739 do
740 {
741 retry = false;
742
743 for (int i = 0; i < paths.size(); i++)
744 {
745 if (!needSet[i])
746 continue;
747
748 String path = paths.get(i);
749 T record = records.get(i);
750 cbList[i] = new SetDataCallbackHandler();
751 _zkClient.asyncSetData(path, record, -1, cbList[i]);
752
753 }
754
755 boolean failOnNoNode = false;
756
757 for (int i = 0; i < cbList.length; i++)
758 {
759 SetDataCallbackHandler cb = cbList[i];
760 cb.waitForSuccess();
761 Code rc = Code.get(cb.getRc());
762 switch (rc)
763 {
764 case OK:
765 setStats.set(i, cb.getStat());
766 needSet[i] = false;
767 break;
768 case NONODE:
769
770 failOnNoNode = true;
771 break;
772 default:
773
774 needSet[i] = false;
775 break;
776 }
777 }
778
779
780 if (failOnNoNode)
781 {
782 boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
783 createCbList = create(paths, records, needCreate, pathsCreated, options);
784 for (int i = 0; i < createCbList.length; i++)
785 {
786 CreateCallbackHandler createCb = createCbList[i];
787 if (createCb == null)
788 {
789 continue;
790 }
791
792 Code rc = Code.get(createCb.getRc());
793 switch (rc)
794 {
795 case OK:
796 setStats.set(i, ZNode.ZERO_STAT);
797 needSet[i] = false;
798 break;
799 case NODEEXISTS:
800 retry = true;
801 break;
802 default:
803
804
805 needSet[i] = false;
806 break;
807 }
808 }
809 }
810 }
811 while (retry);
812
813
814 for (int i = 0; i < cbList.length; i++)
815 {
816 SetDataCallbackHandler cb = cbList[i];
817
818 Code rc = Code.get(cb.getRc());
819 if (rc == Code.OK)
820 {
821 success[i] = true;
822 }
823 else if (rc == Code.NONODE)
824 {
825 CreateCallbackHandler createCb = createCbList[i];
826 if (Code.get(createCb.getRc()) == Code.OK)
827 {
828 success[i] = true;
829 }
830 }
831 }
832
833 if (stats != null)
834 {
835 stats.clear();
836 stats.addAll(setStats);
837 }
838
839 return success;
840 }
841 finally
842 {
843 long endT = System.nanoTime();
844 if (LOG.isTraceEnabled())
845 {
846 LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
847 + ",... time: " + (endT - startT) + " ns");
848 }
849 }
850 }
851
852
853
854
855
856 @Override
857 public boolean[] updateChildren(List<String> paths,
858 List<DataUpdater<T>> updaters,
859 int options)
860 {
861
862 List<T> updateData = update(paths, updaters, null, null, options);
863 boolean[] success = new boolean[paths.size()];
864 for (int i = 0; i < paths.size(); i++)
865 {
866 T data = updateData.get(i);
867 success[i] = (data != null);
868 }
869 return success;
870 }
871
872
873
874
875
876
877 List<T> update(List<String> paths,
878 List<DataUpdater<T>> updaters,
879 List<List<String>> pathsCreated,
880 List<Stat> stats,
881 int options)
882 {
883 if (paths == null || paths.size() == 0)
884 {
885 LOG.error("paths is null or empty");
886 return Collections.emptyList();
887 }
888
889 if (updaters.size() != paths.size()
890 || (pathsCreated != null && pathsCreated.size() != paths.size()))
891 {
892 throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
893 }
894
895 List<Stat> setStats =
896 new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
897 List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
898
899 CreateMode mode = AccessOption.getMode(options);
900 if (mode == null)
901 {
902 LOG.error("Invalid update mode. options: " + options);
903 return updateData;
904 }
905
906 SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
907 CreateCallbackHandler[] createCbList = null;
908 boolean[] needUpdate = new boolean[paths.size()];
909 Arrays.fill(needUpdate, true);
910
911 long startT = System.nanoTime();
912
913 try
914 {
915 boolean retry;
916 do
917 {
918 retry = false;
919 boolean[] needCreate = new boolean[paths.size()];
920 boolean failOnNoNode = false;
921
922
923 List<Stat> curStats = new ArrayList<Stat>();
924 List<T> curDataList =
925 get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
926
927
928 List<T> newDataList = new ArrayList<T>();
929 for (int i = 0; i < paths.size(); i++)
930 {
931 if (!needUpdate[i])
932 {
933 newDataList.add(null);
934 continue;
935 }
936 String path = paths.get(i);
937 DataUpdater<T> updater = updaters.get(i);
938 T newData = updater.update(curDataList.get(i));
939 newDataList.add(newData);
940 Stat curStat = curStats.get(i);
941 if (curStat == null)
942 {
943
944 failOnNoNode = true;
945 needCreate[i] = true;
946 }
947 else
948 {
949 cbList[i] = new SetDataCallbackHandler();
950 _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
951 }
952 }
953
954
955 boolean failOnBadVersion = false;
956
957 for (int i = 0; i < paths.size(); i++)
958 {
959 SetDataCallbackHandler cb = cbList[i];
960 if (cb == null)
961 continue;
962
963 cb.waitForSuccess();
964
965 switch (Code.get(cb.getRc()))
966 {
967 case OK:
968 updateData.set(i, newDataList.get(i));
969 setStats.set(i, cb.getStat());
970 needUpdate[i] = false;
971 break;
972 case NONODE:
973 failOnNoNode = true;
974 needCreate[i] = true;
975 break;
976 case BADVERSION:
977 failOnBadVersion = true;
978 break;
979 default:
980
981
982 needUpdate[i] = false;
983 break;
984 }
985 }
986
987
988 if (failOnNoNode)
989 {
990 createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
991 for (int i = 0; i < paths.size(); i++)
992 {
993 CreateCallbackHandler createCb = createCbList[i];
994 if (createCb == null)
995 {
996 continue;
997 }
998
999 switch (Code.get(createCb.getRc()))
1000 {
1001 case OK:
1002 needUpdate[i] = false;
1003 updateData.set(i, newDataList.get(i));
1004 setStats.set(i, ZNode.ZERO_STAT);
1005 break;
1006 case NODEEXISTS:
1007 retry = true;
1008 break;
1009 default:
1010
1011
1012 needUpdate[i] = false;
1013 break;
1014 }
1015 }
1016 }
1017
1018
1019 if (failOnBadVersion)
1020 {
1021 retry = true;
1022 }
1023 }
1024 while (retry);
1025
1026 if (stats != null)
1027 {
1028 stats.clear();
1029 stats.addAll(setStats);
1030 }
1031
1032 return updateData;
1033 }
1034 finally
1035 {
1036 long endT = System.nanoTime();
1037 if (LOG.isTraceEnabled())
1038 {
1039 LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
1040 + ",... time: " + (endT - startT) + " ns");
1041 }
1042 }
1043
1044 }
1045
1046
1047
1048
1049
1050 @Override
1051 public boolean[] exists(List<String> paths, int options)
1052 {
1053 Stat[] stats = getStats(paths, options);
1054
1055 boolean[] exists = new boolean[paths.size()];
1056 for (int i = 0; i < paths.size(); i++)
1057 {
1058 exists[i] = (stats[i] != null);
1059 }
1060
1061 return exists;
1062 }
1063
1064
1065
1066
1067
1068 @Override
1069 public Stat[] getStats(List<String> paths, int options)
1070 {
1071 if (paths == null || paths.size() == 0)
1072 {
1073 LOG.error("paths is null or empty");
1074 return new Stat[0];
1075 }
1076
1077 Stat[] stats = new Stat[paths.size()];
1078
1079 long startT = System.nanoTime();
1080
1081 try
1082 {
1083 ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()];
1084 for (int i = 0; i < paths.size(); i++)
1085 {
1086 String path = paths.get(i);
1087 cbList[i] = new ExistsCallbackHandler();
1088 _zkClient.asyncExists(path, cbList[i]);
1089 }
1090
1091 for (int i = 0; i < cbList.length; i++)
1092 {
1093 ExistsCallbackHandler cb = cbList[i];
1094 cb.waitForSuccess();
1095 stats[i] = cb._stat;
1096 }
1097
1098 return stats;
1099 }
1100 finally
1101 {
1102 long endT = System.nanoTime();
1103 if (LOG.isTraceEnabled())
1104 {
1105 LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
1106 + ",... time: " + (endT - startT) + " ns");
1107 }
1108 }
1109 }
1110
1111
1112
1113
1114
1115 @Override
1116 public boolean[] remove(List<String> paths, int options)
1117 {
1118 if (paths == null || paths.size() == 0)
1119 {
1120 return new boolean[0];
1121 }
1122
1123 boolean[] success = new boolean[paths.size()];
1124
1125 DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()];
1126
1127 long startT = System.nanoTime();
1128
1129 try
1130 {
1131 for (int i = 0; i < paths.size(); i++)
1132 {
1133 String path = paths.get(i);
1134 cbList[i] = new DeleteCallbackHandler();
1135 _zkClient.asyncDelete(path, cbList[i]);
1136 }
1137
1138 for (int i = 0; i < cbList.length; i++)
1139 {
1140 DeleteCallbackHandler cb = cbList[i];
1141 cb.waitForSuccess();
1142 success[i] = (cb.getRc() == 0);
1143 }
1144
1145 return success;
1146 }
1147 finally
1148 {
1149 long endT = System.nanoTime();
1150 if (LOG.isTraceEnabled())
1151 {
1152 LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
1153 + ",... time: " + (endT - startT) + " ns");
1154 }
1155 }
1156 }
1157
1158
1159
1160
1161 @Override
1162 public void subscribeDataChanges(String path, IZkDataListener listener)
1163 {
1164 _zkClient.subscribeDataChanges(path, listener);
1165 }
1166
1167
1168
1169
1170 @Override
1171 public void unsubscribeDataChanges(String path, IZkDataListener dataListener)
1172 {
1173 _zkClient.unsubscribeDataChanges(path, dataListener);
1174 }
1175
1176
1177
1178
1179 @Override
1180 public List<String> subscribeChildChanges(String path, IZkChildListener listener)
1181 {
1182 return _zkClient.subscribeChildChanges(path, listener);
1183 }
1184
1185
1186
1187
1188 @Override
1189 public void unsubscribeChildChanges(String path, IZkChildListener childListener)
1190 {
1191 _zkClient.unsubscribeChildChanges(path, childListener);
1192 }
1193
1194
1195 public static void main(String[] args)
1196 {
1197 ZkClient zkclient = new ZkClient("localhost:2191");
1198 zkclient.setZkSerializer(new ZNRecordSerializer());
1199 ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
1200
1201
1202 List<String> createPaths =
1203 Arrays.asList("/test/child1/child1", "/test/child2/child2");
1204 List<ZNRecord> createRecords =
1205 Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
1206
1207 boolean[] needCreate = new boolean[createPaths.size()];
1208 Arrays.fill(needCreate, true);
1209 List<List<String>> pathsCreated =
1210 new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(),
1211 null));
1212 accessor.create(createPaths,
1213 createRecords,
1214 needCreate,
1215 pathsCreated,
1216 AccessOption.PERSISTENT);
1217 System.out.println("pathsCreated: " + pathsCreated);
1218
1219
1220 List<String> setPaths =
1221 Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
1222 List<ZNRecord> setRecords =
1223 Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
1224
1225 pathsCreated =
1226 new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(),
1227 null));
1228 boolean[] success =
1229 accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
1230 System.out.println("pathsCreated: " + pathsCreated);
1231 System.out.println("setSuccess: " + Arrays.toString(success));
1232
1233
1234 List<String> updatePaths =
1235 Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
1236 class TestUpdater implements DataUpdater<ZNRecord>
1237 {
1238 final ZNRecord _newData;
1239
1240 public TestUpdater(ZNRecord newData)
1241 {
1242 _newData = newData;
1243 }
1244
1245 @Override
1246 public ZNRecord update(ZNRecord currentData)
1247 {
1248 return _newData;
1249
1250 }
1251 }
1252 List<DataUpdater<ZNRecord>> updaters =
1253 Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
1254 (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
1255
1256 pathsCreated =
1257 new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(),
1258 null));
1259
1260 List<ZNRecord> updateRecords =
1261 accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
1262 for (int i = 0; i < updatePaths.size(); i++)
1263 {
1264 success[i] = updateRecords.get(i) != null;
1265 }
1266 System.out.println("pathsCreated: " + pathsCreated);
1267 System.out.println("updateSuccess: " + Arrays.toString(success));
1268
1269 System.out.println("CLOSING");
1270 zkclient.close();
1271 }
1272
1273
1274
1275
1276 @Override
1277 public void reset()
1278 {
1279
1280 }
1281 }