1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.Comparator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.locks.ReentrantLock;
31
32 import org.I0Itec.zkclient.DataUpdater;
33 import org.I0Itec.zkclient.IZkChildListener;
34 import org.I0Itec.zkclient.IZkDataListener;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.I0Itec.zkclient.serialize.ZkSerializer;
37 import org.apache.helix.AccessOption;
38 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
39 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
40 import org.apache.helix.store.HelixPropertyListener;
41 import org.apache.helix.store.HelixPropertyStore;
42 import org.apache.helix.store.zk.ZNode;
43 import org.apache.log4j.Logger;
44 import org.apache.zookeeper.KeeperException.Code;
45 import org.apache.zookeeper.common.PathUtils;
46 import org.apache.zookeeper.data.Stat;
47 import org.apache.zookeeper.server.DataTree;
48
49
50 public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
51 {
52 private static final Logger LOG =
53 Logger.getLogger(ZkCacheBaseDataAccessor.class);
54
55 protected WriteThroughCache<T> _wtCache;
56 protected ZkCallbackCache<T> _zkCache;
57
58 final ZkBaseDataAccessor<T> _baseAccessor;
59 final Map<String, Cache<T>> _cacheMap;
60
61 final String _chrootPath;
62 final List<String> _wtCachePaths;
63 final List<String> _zkCachePaths;
64
65 final HelixGroupCommit<T> _groupCommit = new HelixGroupCommit<T>();
66
67
68 private final ReentrantLock _eventLock = new ReentrantLock();
69 private ZkCacheEventThread _eventThread;
70
71 private ZkClient _zkclient = null;
72
73 public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
74 List<String> wtCachePaths)
75 {
76 this(baseAccessor, null, wtCachePaths, null);
77 }
78
79 public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
80 String chrootPath,
81 List<String> wtCachePaths,
82 List<String> zkCachePaths)
83 {
84 _baseAccessor = baseAccessor;
85
86 if (chrootPath == null || chrootPath.equals("/"))
87 {
88 _chrootPath = null;
89 }
90 else
91 {
92 PathUtils.validatePath(chrootPath);
93 _chrootPath = chrootPath;
94 }
95
96 _wtCachePaths = wtCachePaths;
97 _zkCachePaths = zkCachePaths;
98
99
100
101
102 _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
103 {
104 @Override
105 public int compare(String o1, String o2)
106 {
107 int len1 = o1.split("/").length;
108 int len2 = o2.split("/").length;
109 return len1 - len2;
110 }
111 });
112
113 start();
114 }
115
116 public ZkCacheBaseDataAccessor(String zkAddress,
117 ZkSerializer serializer,
118 String chrootPath,
119 List<String> wtCachePaths,
120 List<String> zkCachePaths)
121 {
122 _zkclient =
123 new ZkClient(zkAddress,
124 ZkClient.DEFAULT_SESSION_TIMEOUT,
125 ZkClient.DEFAULT_CONNECTION_TIMEOUT,
126 serializer);
127 _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
128 TimeUnit.MILLISECONDS);
129 _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
130
131 if (chrootPath == null || chrootPath.equals("/"))
132 {
133 _chrootPath = null;
134 }
135 else
136 {
137 PathUtils.validatePath(chrootPath);
138 _chrootPath = chrootPath;
139 }
140
141 _wtCachePaths = wtCachePaths;
142 _zkCachePaths = zkCachePaths;
143
144
145
146
147 _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
148 {
149 @Override
150 public int compare(String o1, String o2)
151 {
152 int len1 = o1.split("/").length;
153 int len2 = o2.split("/").length;
154 return len1 - len2;
155 }
156 });
157
158 start();
159 }
160
161 private String prependChroot(String clientPath)
162 {
163 PathUtils.validatePath(clientPath);
164
165 if (_chrootPath != null)
166 {
167
168 if (clientPath.length() == 1)
169 {
170 return _chrootPath;
171 }
172 return _chrootPath + clientPath;
173 }
174 else
175 {
176 return clientPath;
177 }
178 }
179
180 private List<String> prependChroot(List<String> clientPaths)
181 {
182 List<String> serverPaths = new ArrayList<String>();
183 for (String clientPath : clientPaths)
184 {
185 serverPaths.add(prependChroot(clientPath));
186 }
187 return serverPaths;
188 }
189
190
191
192
193 private String firstCachePath(List<String> paths)
194 {
195 for (String cachePath : _cacheMap.keySet())
196 {
197 for (String path : paths)
198 {
199 if (path.startsWith(cachePath))
200 {
201 return path;
202 }
203 }
204 }
205 return null;
206 }
207
208 private Cache<T> getCache(String path)
209 {
210 for (String cachePath : _cacheMap.keySet())
211 {
212 if (path.startsWith(cachePath))
213 {
214 return _cacheMap.get(cachePath);
215 }
216 }
217
218 return null;
219 }
220
221 private Cache<T> getCache(List<String> paths)
222 {
223 Cache<T> cache = null;
224 for (String path : paths)
225 {
226 for (String cachePath : _cacheMap.keySet())
227 {
228 if (cache == null && path.startsWith(cachePath))
229 {
230 cache = _cacheMap.get(cachePath);
231 }
232 else if (cache != null && cache != _cacheMap.get(cachePath))
233 {
234 throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
235 + paths);
236 }
237 }
238 }
239
240 return cache;
241 }
242
243 private void updateCache(Cache<T> cache,
244 List<String> createPaths,
245 boolean success,
246 String updatePath,
247 T data,
248 Stat stat)
249 {
250 if (createPaths == null || createPaths.isEmpty())
251 {
252 if (success)
253 {
254 cache.update(updatePath, data, stat);
255 }
256 }
257 else
258 {
259 String firstPath = firstCachePath(createPaths);
260 if (firstPath != null)
261 {
262 cache.updateRecursive(firstPath);
263 }
264 }
265 }
266
267 @Override
268 public boolean create(String path, T data, int options)
269 {
270 String clientPath = path;
271 String serverPath = prependChroot(clientPath);
272
273 Cache<T> cache = getCache(serverPath);
274 if (cache != null)
275 {
276 try
277 {
278 cache.lockWrite();
279 List<String> pathsCreated = new ArrayList<String>();
280 RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
281 boolean success = (rc == RetCode.OK);
282
283 updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
284
285 return success;
286 }
287 finally
288 {
289 cache.unlockWrite();
290 }
291 }
292
293
294 return _baseAccessor.create(serverPath, data, options);
295 }
296
297 @Override
298 public boolean set(String path, T data, int options)
299 {
300 String clientPath = path;
301 String serverPath = prependChroot(clientPath);
302
303 Cache<T> cache = getCache(serverPath);
304 if (cache != null)
305 {
306 try
307 {
308 cache.lockWrite();
309 Stat setStat = new Stat();
310 List<String> pathsCreated = new ArrayList<String>();
311 boolean success =
312 _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
313
314 updateCache(cache, pathsCreated, success, serverPath, data, setStat);
315
316 return success;
317 }
318 finally
319 {
320 cache.unlockWrite();
321 }
322 }
323
324
325 return _baseAccessor.set(serverPath, data, options);
326 }
327
328 @Override
329 public boolean update(String path, DataUpdater<T> updater, int options)
330 {
331 String clientPath = path;
332 String serverPath = prependChroot(clientPath);
333
334 Cache<T> cache = getCache(serverPath);
335
336 if (cache != null)
337 {
338 try
339 {
340 cache.lockWrite();
341 Stat setStat = new Stat();
342 List<String> pathsCreated = new ArrayList<String>();
343 T updateData =
344 _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
345 boolean success = (updateData != null);
346 updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
347
348 return success;
349 }
350 finally
351 {
352 cache.unlockWrite();
353 }
354 }
355
356
357 return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
358
359 }
360
361 @Override
362 public boolean exists(String path, int options)
363 {
364 String clientPath = path;
365 String serverPath = prependChroot(clientPath);
366
367 Cache<T> cache = getCache(serverPath);
368 if (cache != null)
369 {
370 boolean exists = cache.exists(serverPath);
371 if (exists)
372 {
373 return true;
374 }
375 }
376
377
378 return _baseAccessor.exists(serverPath, options);
379 }
380
381 @Override
382 public boolean remove(String path, int options)
383 {
384 String clientPath = path;
385 String serverPath = prependChroot(clientPath);
386
387 Cache<T> cache = getCache(serverPath);
388 if (cache != null)
389 {
390 try
391 {
392 cache.lockWrite();
393
394 boolean success = _baseAccessor.remove(serverPath, options);
395 if (success)
396 {
397 cache.purgeRecursive(serverPath);
398 }
399
400 return success;
401 }
402 finally
403 {
404 cache.unlockWrite();
405 }
406 }
407
408
409 return _baseAccessor.remove(serverPath, options);
410 }
411
412 @Override
413 public T get(String path, Stat stat, int options)
414 {
415 String clientPath = path;
416 String serverPath = prependChroot(clientPath);
417
418 Cache<T> cache = getCache(serverPath);
419 if (cache != null)
420 {
421 T record = null;
422 ZNode znode = cache.get(serverPath);
423
424 if (znode != null)
425 {
426
427 record = ((T) znode.getData());
428 if (stat != null)
429 {
430 DataTree.copyStat(znode.getStat(), stat);
431 }
432 return record;
433
434 }
435 else
436 {
437
438 try
439 {
440 cache.lockWrite();
441 record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
442 cache.update(serverPath, record, stat);
443 }
444 catch (ZkNoNodeException e)
445 {
446 if (AccessOption.isThrowExceptionIfNotExist(options))
447 {
448 throw e;
449 }
450 }
451 finally
452 {
453 cache.unlockWrite();
454 }
455
456 return record;
457 }
458 }
459
460
461 return _baseAccessor.get(serverPath, stat, options);
462 }
463
464 @Override
465 public Stat getStat(String path, int options)
466 {
467 String clientPath = path;
468 String serverPath = prependChroot(clientPath);
469
470 Cache<T> cache = getCache(serverPath);
471 if (cache != null)
472 {
473 Stat stat = new Stat();
474 ZNode znode = cache.get(serverPath);
475
476 if (znode != null)
477 {
478 return znode.getStat();
479
480 }
481 else
482 {
483
484 try
485 {
486 cache.lockWrite();
487 T data = _baseAccessor.get(serverPath, stat, options);
488 cache.update(serverPath, data, stat);
489 }
490 catch (ZkNoNodeException e)
491 {
492 return null;
493 }
494 finally
495 {
496 cache.unlockWrite();
497 }
498
499 return stat;
500 }
501 }
502
503
504 return _baseAccessor.getStat(serverPath, options);
505 }
506
507 @Override
508 public boolean[] createChildren(List<String> paths, List<T> records, int options)
509 {
510 final int size = paths.size();
511 List<String> serverPaths = prependChroot(paths);
512
513 Cache<T> cache = getCache(serverPaths);
514 if (cache != null)
515 {
516 try
517 {
518 cache.lockWrite();
519 boolean[] needCreate = new boolean[size];
520 Arrays.fill(needCreate, true);
521 List<List<String>> pathsCreatedList =
522 new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
523 CreateCallbackHandler[] createCbList =
524 _baseAccessor.create(serverPaths,
525 records,
526 needCreate,
527 pathsCreatedList,
528 options);
529
530 boolean[] success = new boolean[size];
531 for (int i = 0; i < size; i++)
532 {
533 CreateCallbackHandler cb = createCbList[i];
534 success[i] = (Code.get(cb.getRc()) == Code.OK);
535
536 updateCache(cache,
537 pathsCreatedList.get(i),
538 success[i],
539 serverPaths.get(i),
540 records.get(i),
541 ZNode.ZERO_STAT);
542 }
543
544 return success;
545 }
546 finally
547 {
548 cache.unlockWrite();
549 }
550 }
551
552
553 return _baseAccessor.createChildren(serverPaths, records, options);
554 }
555
556 @Override
557 public boolean[] setChildren(List<String> paths, List<T> records, int options)
558 {
559 final int size = paths.size();
560 List<String> serverPaths = prependChroot(paths);
561
562 Cache<T> cache = getCache(serverPaths);
563 if (cache != null)
564 {
565 try
566 {
567 cache.lockWrite();
568 List<Stat> setStats = new ArrayList<Stat>();
569 List<List<String>> pathsCreatedList =
570 new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
571 boolean[] success =
572 _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
573
574 for (int i = 0; i < size; i++)
575 {
576 updateCache(cache,
577 pathsCreatedList.get(i),
578 success[i],
579 serverPaths.get(i),
580 records.get(i),
581 setStats.get(i));
582 }
583
584 return success;
585 }
586 finally
587 {
588 cache.unlockWrite();
589 }
590 }
591
592 return _baseAccessor.setChildren(serverPaths, records, options);
593 }
594
595 @Override
596 public boolean[] updateChildren(List<String> paths,
597 List<DataUpdater<T>> updaters,
598 int options)
599 {
600 final int size = paths.size();
601 List<String> serverPaths = prependChroot(paths);
602
603 Cache<T> cache = getCache(serverPaths);
604 if (cache != null)
605 {
606 try
607 {
608 cache.lockWrite();
609
610 List<Stat> setStats = new ArrayList<Stat>();
611 boolean[] success = new boolean[size];
612 List<List<String>> pathsCreatedList =
613 new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
614 List<T> updateData =
615 _baseAccessor.update(serverPaths,
616 updaters,
617 pathsCreatedList,
618 setStats,
619 options);
620
621
622
623
624
625
626
627 for (int i = 0; i < size; i++)
628 {
629 success[i] = (updateData.get(i) != null);
630 updateCache(cache,
631 pathsCreatedList.get(i),
632 success[i],
633 serverPaths.get(i),
634 updateData.get(i),
635 setStats.get(i));
636 }
637 return success;
638 }
639 finally
640 {
641 cache.unlockWrite();
642 }
643 }
644
645
646 return _baseAccessor.updateChildren(serverPaths, updaters, options);
647 }
648
649
650 @Override
651 public boolean[] exists(List<String> paths, int options)
652 {
653 final int size = paths.size();
654 List<String> serverPaths = prependChroot(paths);
655
656 boolean exists[] = new boolean[size];
657 for (int i = 0; i < size; i++)
658 {
659 exists[i] = exists(serverPaths.get(i), options);
660 }
661 return exists;
662 }
663
664 @Override
665 public boolean[] remove(List<String> paths, int options)
666 {
667 final int size = paths.size();
668 List<String> serverPaths = prependChroot(paths);
669
670 Cache<T> cache = getCache(serverPaths);
671 if (cache != null)
672 {
673 try
674 {
675 cache.lockWrite();
676
677 boolean[] success = _baseAccessor.remove(serverPaths, options);
678
679 for (int i = 0; i < size; i++)
680 {
681 if (success[i])
682 {
683 cache.purgeRecursive(serverPaths.get(i));
684 }
685 }
686 return success;
687 }
688 finally
689 {
690 cache.unlockWrite();
691 }
692 }
693
694
695 return _baseAccessor.remove(serverPaths, options);
696 }
697
698 @Override
699 public List<T> get(List<String> paths, List<Stat> stats, int options)
700 {
701 if (paths == null || paths.isEmpty())
702 {
703 return Collections.emptyList();
704 }
705
706 final int size = paths.size();
707 List<String> serverPaths = prependChroot(paths);
708
709 List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
710 List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
711
712 boolean needRead = false;
713 boolean needReads[] = new boolean[size];
714
715 Cache<T> cache = getCache(serverPaths);
716 if (cache != null)
717 {
718 try
719 {
720 cache.lockRead();
721 for (int i = 0; i < size; i++)
722 {
723 ZNode zNode = cache.get(serverPaths.get(i));
724 if (zNode != null)
725 {
726
727 records.set(i, (T) zNode.getData());
728 readStats.set(i, zNode.getStat());
729 }
730 else
731 {
732 needRead = true;
733 needReads[i] = true;
734 }
735 }
736 }
737 finally
738 {
739 cache.unlockRead();
740 }
741
742
743 if (needRead)
744 {
745 cache.lockWrite();
746 try
747 {
748 List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
749 for (int i = 0; i < size; i++)
750 {
751 if (needReads[i])
752 {
753 records.set(i, readRecords.get(i));
754 cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
755 }
756 }
757 }
758 finally
759 {
760 cache.unlockWrite();
761 }
762 }
763
764 if (stats != null)
765 {
766 stats.clear();
767 stats.addAll(readStats);
768 }
769
770 return records;
771 }
772
773
774 return _baseAccessor.get(serverPaths, stats, options);
775 }
776
777
778 @Override
779 public Stat[] getStats(List<String> paths, int options)
780 {
781 List<String> serverPaths = prependChroot(paths);
782 return _baseAccessor.getStats(serverPaths, options);
783 }
784
785 @Override
786 public List<String> getChildNames(String parentPath, int options)
787 {
788 String serverParentPath = prependChroot(parentPath);
789
790 Cache<T> cache = getCache(serverParentPath);
791 if (cache != null)
792 {
793
794 ZNode znode = cache.get(serverParentPath);
795
796 if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
797 {
798
799 List<String> childNames = new ArrayList<String>(znode.getChildSet());
800 Collections.sort(childNames);
801 return childNames;
802 }
803 else
804 {
805
806 try
807 {
808 cache.lockWrite();
809
810 List<String> childNames =
811 _baseAccessor.getChildNames(serverParentPath, options);
812
813 cache.addToParentChildSet(serverParentPath, childNames);
814
815 return childNames;
816 }
817 finally
818 {
819 cache.unlockWrite();
820 }
821 }
822 }
823
824
825 return _baseAccessor.getChildNames(serverParentPath, options);
826 }
827
828 @Override
829 public List<T> getChildren(String parentPath, List<Stat> stats, int options)
830 {
831 List<String> childNames = getChildNames(parentPath, options);
832 if (childNames == null)
833 {
834 return null;
835 }
836
837 List<String> paths = new ArrayList<String>();
838 for (String childName : childNames)
839 {
840 String path = parentPath + "/" + childName;
841 paths.add(path);
842 }
843
844 return get(paths, stats, options);
845 }
846
847 @Override
848 public void subscribeDataChanges(String path, IZkDataListener listener)
849 {
850 String serverPath = prependChroot(path);
851
852 _baseAccessor.subscribeDataChanges(serverPath, listener);
853 }
854
855 @Override
856 public void unsubscribeDataChanges(String path, IZkDataListener listener)
857 {
858 String serverPath = prependChroot(path);
859
860 _baseAccessor.unsubscribeDataChanges(serverPath, listener);
861 }
862
863 @Override
864 public List<String> subscribeChildChanges(String path, IZkChildListener listener)
865 {
866 String serverPath = prependChroot(path);
867
868 return _baseAccessor.subscribeChildChanges(serverPath, listener);
869 }
870
871 @Override
872 public void unsubscribeChildChanges(String path, IZkChildListener listener)
873 {
874 String serverPath = prependChroot(path);
875
876 _baseAccessor.unsubscribeChildChanges(serverPath, listener);
877 }
878
879 @Override
880 public void subscribe(String parentPath, HelixPropertyListener listener)
881 {
882 String serverPath = prependChroot(parentPath);
883 _zkCache.subscribe(serverPath, listener);
884 }
885
886 @Override
887 public void unsubscribe(String parentPath, HelixPropertyListener listener)
888 {
889 String serverPath = prependChroot(parentPath);
890 _zkCache.unsubscribe(serverPath, listener);
891 }
892
893 @Override
894 public void start()
895 {
896
897 LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
898 + ", " + _zkCachePaths);
899
900
901 try
902 {
903 _eventLock.lockInterruptibly();
904 if (_eventThread != null)
905 {
906 LOG.warn(_eventThread + " has already started");
907 }
908 else
909 {
910
911 if (_zkCachePaths == null || _zkCachePaths.isEmpty())
912 {
913 LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
914 }
915 else
916 {
917 LOG.debug("Starting ZkCacheEventThread...");
918
919 _eventThread = new ZkCacheEventThread("");
920 _eventThread.start();
921 }
922 }
923 }
924 catch (InterruptedException e)
925 {
926 LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
927 }
928 finally
929 {
930 _eventLock.unlock();
931 }
932 LOG.debug("Start ZkCacheEventThread...done");
933
934 _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
935 _zkCache =
936 new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
937
938 if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
939 {
940 for (String path : _wtCachePaths)
941 {
942 _cacheMap.put(path, _wtCache);
943 }
944 }
945
946 if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
947 {
948 for (String path : _zkCachePaths)
949 {
950 _cacheMap.put(path, _zkCache);
951 }
952 }
953 }
954
955 @Override
956 public void stop()
957 {
958 try
959 {
960 _eventLock.lockInterruptibly();
961
962 if (_zkclient != null)
963 {
964 _zkclient.close();
965 _zkclient = null;
966 }
967
968 if (_eventThread == null)
969 {
970 LOG.warn(_eventThread + " has already stopped");
971 return;
972 }
973
974 LOG.debug("Stopping ZkCacheEventThread...");
975 _eventThread.interrupt();
976 _eventThread.join(2000);
977 _eventThread = null;
978 }
979 catch (InterruptedException e)
980 {
981 LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
982 }
983 finally
984 {
985 _eventLock.unlock();
986 }
987
988 LOG.debug("Stop ZkCacheEventThread...done");
989
990 }
991
992 @Override
993 public void reset()
994 {
995 if (_wtCache != null)
996 {
997 _wtCache.reset();
998 }
999
1000 if (_zkCache != null)
1001 {
1002 _zkCache.reset();
1003 }
1004 }
1005 }