View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.Comparator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.locks.ReentrantLock;
31  
32  import org.I0Itec.zkclient.DataUpdater;
33  import org.I0Itec.zkclient.IZkChildListener;
34  import org.I0Itec.zkclient.IZkDataListener;
35  import org.I0Itec.zkclient.exception.ZkNoNodeException;
36  import org.I0Itec.zkclient.serialize.ZkSerializer;
37  import org.apache.helix.AccessOption;
38  import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
39  import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
40  import org.apache.helix.store.HelixPropertyListener;
41  import org.apache.helix.store.HelixPropertyStore;
42  import org.apache.helix.store.zk.ZNode;
43  import org.apache.log4j.Logger;
44  import org.apache.zookeeper.KeeperException.Code;
45  import org.apache.zookeeper.common.PathUtils;
46  import org.apache.zookeeper.data.Stat;
47  import org.apache.zookeeper.server.DataTree;
48  
49  
50  public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
51  {
52    private static final Logger    LOG          =
53                                                    Logger.getLogger(ZkCacheBaseDataAccessor.class);
54  
55    protected WriteThroughCache<T> _wtCache;
56    protected ZkCallbackCache<T>   _zkCache;
57  
58    final ZkBaseDataAccessor<T>    _baseAccessor;
59    final Map<String, Cache<T>>    _cacheMap;
60  
61    final String                   _chrootPath;
62    final List<String>             _wtCachePaths;
63    final List<String>             _zkCachePaths;
64  
65    final HelixGroupCommit<T>      _groupCommit = new HelixGroupCommit<T>();
66  
67    // fire listeners
68    private final ReentrantLock    _eventLock   = new ReentrantLock();
69    private ZkCacheEventThread     _eventThread;
70  
71    private ZkClient               _zkclient    = null;
72  
73    public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
74                                   List<String> wtCachePaths)
75    {
76      this(baseAccessor, null, wtCachePaths, null);
77    }
78  
79    public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor,
80                                   String chrootPath,
81                                   List<String> wtCachePaths,
82                                   List<String> zkCachePaths)
83    {
84      _baseAccessor = baseAccessor;
85  
86      if (chrootPath == null || chrootPath.equals("/"))
87      {
88        _chrootPath = null;
89      }
90      else
91      {
92        PathUtils.validatePath(chrootPath);
93        _chrootPath = chrootPath;
94      }
95  
96      _wtCachePaths = wtCachePaths;
97      _zkCachePaths = zkCachePaths;
98  
99      // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
100     // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
101     // comes first
102     _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
103     {
104       @Override
105       public int compare(String o1, String o2)
106       {
107         int len1 = o1.split("/").length;
108         int len2 = o2.split("/").length;
109         return len1 - len2;
110       }
111     });
112 
113     start();
114   }
115 
116   public ZkCacheBaseDataAccessor(String zkAddress,
117                                  ZkSerializer serializer,
118                                  String chrootPath,
119                                  List<String> wtCachePaths,
120                                  List<String> zkCachePaths)
121   {
122     _zkclient =
123         new ZkClient(zkAddress,
124                      ZkClient.DEFAULT_SESSION_TIMEOUT,
125                      ZkClient.DEFAULT_CONNECTION_TIMEOUT,
126                      serializer);
127     _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
128                                  TimeUnit.MILLISECONDS);
129     _baseAccessor = new ZkBaseDataAccessor<T>(_zkclient);
130 
131     if (chrootPath == null || chrootPath.equals("/"))
132     {
133       _chrootPath = null;
134     }
135     else
136     {
137       PathUtils.validatePath(chrootPath);
138       _chrootPath = chrootPath;
139     }
140 
141     _wtCachePaths = wtCachePaths;
142     _zkCachePaths = zkCachePaths;
143 
144     // TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
145     // TreeMap key is ordered by key string length, so more general (i.e. short) prefix
146     // comes first
147     _cacheMap = new TreeMap<String, Cache<T>>(new Comparator<String>()
148     {
149       @Override
150       public int compare(String o1, String o2)
151       {
152         int len1 = o1.split("/").length;
153         int len2 = o2.split("/").length;
154         return len1 - len2;
155       }
156     });
157 
158     start();
159   }
160 
161   private String prependChroot(String clientPath)
162   {
163     PathUtils.validatePath(clientPath);
164 
165     if (_chrootPath != null)
166     {
167       // handle clientPath = "/"
168       if (clientPath.length() == 1)
169       {
170         return _chrootPath;
171       }
172       return _chrootPath + clientPath;
173     }
174     else
175     {
176       return clientPath;
177     }
178   }
179 
180   private List<String> prependChroot(List<String> clientPaths)
181   {
182     List<String> serverPaths = new ArrayList<String>();
183     for (String clientPath : clientPaths)
184     {
185       serverPaths.add(prependChroot(clientPath));
186     }
187     return serverPaths;
188   }
189 
190   /**
191    * find the first path in paths that is a descendant
192    */
193   private String firstCachePath(List<String> paths)
194   {
195     for (String cachePath : _cacheMap.keySet())
196     {
197       for (String path : paths)
198       {
199         if (path.startsWith(cachePath))
200         {
201           return path;
202         }
203       }
204     }
205     return null;
206   }
207 
208   private Cache<T> getCache(String path)
209   {
210     for (String cachePath : _cacheMap.keySet())
211     {
212       if (path.startsWith(cachePath))
213       {
214         return _cacheMap.get(cachePath);
215       }
216     }
217 
218     return null;
219   }
220 
221   private Cache<T> getCache(List<String> paths)
222   {
223     Cache<T> cache = null;
224     for (String path : paths)
225     {
226       for (String cachePath : _cacheMap.keySet())
227       {
228         if (cache == null && path.startsWith(cachePath))
229         {
230           cache = _cacheMap.get(cachePath);
231         }
232         else if (cache != null && cache != _cacheMap.get(cachePath))
233         {
234           throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
235               + paths);
236         }
237       }
238     }
239 
240     return cache;
241   }
242 
243   private void updateCache(Cache<T> cache,
244                            List<String> createPaths,
245                            boolean success,
246                            String updatePath,
247                            T data,
248                            Stat stat)
249   {
250     if (createPaths == null || createPaths.isEmpty())
251     {
252       if (success)
253       {
254         cache.update(updatePath, data, stat);
255       }
256     }
257     else
258     {
259       String firstPath = firstCachePath(createPaths);
260       if (firstPath != null)
261       {
262         cache.updateRecursive(firstPath);
263       }
264     }
265   }
266 
267   @Override
268   public boolean create(String path, T data, int options)
269   {
270     String clientPath = path;
271     String serverPath = prependChroot(clientPath);
272 
273     Cache<T> cache = getCache(serverPath);
274     if (cache != null)
275     {
276       try
277       {
278         cache.lockWrite();
279         List<String> pathsCreated = new ArrayList<String>();
280         RetCode rc = _baseAccessor.create(serverPath, data, pathsCreated, options);
281         boolean success = (rc == RetCode.OK);
282 
283         updateCache(cache, pathsCreated, success, serverPath, data, ZNode.ZERO_STAT);
284 
285         return success;
286       }
287       finally
288       {
289         cache.unlockWrite();
290       }
291     }
292 
293     // no cache
294     return _baseAccessor.create(serverPath, data, options);
295   }
296 
297   @Override
298   public boolean set(String path, T data, int options)
299   {
300     String clientPath = path;
301     String serverPath = prependChroot(clientPath);
302 
303     Cache<T> cache = getCache(serverPath);
304     if (cache != null)
305     {
306       try
307       {
308         cache.lockWrite();
309         Stat setStat = new Stat();
310         List<String> pathsCreated = new ArrayList<String>();
311         boolean success =
312             _baseAccessor.set(serverPath, data, pathsCreated, setStat, -1, options);
313 
314         updateCache(cache, pathsCreated, success, serverPath, data, setStat);
315 
316         return success;
317       }
318       finally
319       {
320         cache.unlockWrite();
321       }
322     }
323 
324     // no cache
325     return _baseAccessor.set(serverPath, data, options);
326   }
327 
328   @Override
329   public boolean update(String path, DataUpdater<T> updater, int options)
330   {
331     String clientPath = path;
332     String serverPath = prependChroot(clientPath);
333 
334     Cache<T> cache = getCache(serverPath);
335 
336     if (cache != null)
337     {
338       try
339       {
340         cache.lockWrite();
341         Stat setStat = new Stat();
342         List<String> pathsCreated = new ArrayList<String>();
343         T updateData =
344             _baseAccessor.update(serverPath, updater, pathsCreated, setStat, options);
345         boolean success = (updateData != null);
346         updateCache(cache, pathsCreated, success, serverPath, updateData, setStat);
347 
348         return success;
349       }
350       finally
351       {
352         cache.unlockWrite();
353       }
354     }
355 
356     // no cache
357     return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
358     // return _baseAccessor.update(serverPath, updater, options);
359   }
360 
361   @Override
362   public boolean exists(String path, int options)
363   {
364     String clientPath = path;
365     String serverPath = prependChroot(clientPath);
366 
367     Cache<T> cache = getCache(serverPath);
368     if (cache != null)
369     {
370       boolean exists = cache.exists(serverPath);
371       if (exists)
372       {
373         return true;
374       }
375     }
376 
377     // if not exists in cache, always fall back to zk
378     return _baseAccessor.exists(serverPath, options);
379   }
380 
381   @Override
382   public boolean remove(String path, int options)
383   {
384     String clientPath = path;
385     String serverPath = prependChroot(clientPath);
386 
387     Cache<T> cache = getCache(serverPath);
388     if (cache != null)
389     {
390       try
391       {
392         cache.lockWrite();
393 
394         boolean success = _baseAccessor.remove(serverPath, options);
395         if (success)
396         {
397           cache.purgeRecursive(serverPath);
398         }
399 
400         return success;
401       }
402       finally
403       {
404         cache.unlockWrite();
405       }
406     }
407 
408     // no cache
409     return _baseAccessor.remove(serverPath, options);
410   }
411 
412   @Override
413   public T get(String path, Stat stat, int options)
414   {
415     String clientPath = path;
416     String serverPath = prependChroot(clientPath);
417 
418     Cache<T> cache = getCache(serverPath);
419     if (cache != null)
420     {
421       T record = null;
422       ZNode znode = cache.get(serverPath);
423 
424       if (znode != null)
425       {
426         // TODO: shall return a deep copy instead of reference
427         record = ((T) znode.getData());
428         if (stat != null)
429         {
430           DataTree.copyStat(znode.getStat(), stat);
431         }
432         return record;
433 
434       }
435       else
436       {
437         // if cache miss, fall back to zk and update cache
438         try
439         {
440           cache.lockWrite();
441           record = _baseAccessor.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
442           cache.update(serverPath, record, stat);
443         }
444         catch (ZkNoNodeException e)
445         {
446           if (AccessOption.isThrowExceptionIfNotExist(options))
447           {
448             throw e;
449           }
450         }
451         finally
452         {
453           cache.unlockWrite();
454         }
455 
456         return record;
457       }
458     }
459 
460     // no cache
461     return _baseAccessor.get(serverPath, stat, options);
462   }
463 
464   @Override
465   public Stat getStat(String path, int options)
466   {
467     String clientPath = path;
468     String serverPath = prependChroot(clientPath);
469 
470     Cache<T> cache = getCache(serverPath);
471     if (cache != null)
472     {
473       Stat stat = new Stat();
474       ZNode znode = cache.get(serverPath);
475 
476       if (znode != null)
477       {
478         return znode.getStat();
479 
480       }
481       else
482       {
483         // if cache miss, fall back to zk and update cache
484         try
485         {
486           cache.lockWrite();
487           T data = _baseAccessor.get(serverPath, stat, options);
488           cache.update(serverPath, data, stat);
489         }
490         catch (ZkNoNodeException e)
491         {
492           return null;
493         }
494         finally
495         {
496           cache.unlockWrite();
497         }
498 
499         return stat;
500       }
501     }
502 
503     // no cache
504     return _baseAccessor.getStat(serverPath, options);
505   }
506 
507   @Override
508   public boolean[] createChildren(List<String> paths, List<T> records, int options)
509   {
510     final int size = paths.size();
511     List<String> serverPaths = prependChroot(paths);
512 
513     Cache<T> cache = getCache(serverPaths);
514     if (cache != null)
515     {
516       try
517       {
518         cache.lockWrite();
519         boolean[] needCreate = new boolean[size];
520         Arrays.fill(needCreate, true);
521         List<List<String>> pathsCreatedList =
522             new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
523         CreateCallbackHandler[] createCbList =
524             _baseAccessor.create(serverPaths,
525                                  records,
526                                  needCreate,
527                                  pathsCreatedList,
528                                  options);
529 
530         boolean[] success = new boolean[size];
531         for (int i = 0; i < size; i++)
532         {
533           CreateCallbackHandler cb = createCbList[i];
534           success[i] = (Code.get(cb.getRc()) == Code.OK);
535 
536           updateCache(cache,
537                       pathsCreatedList.get(i),
538                       success[i],
539                       serverPaths.get(i),
540                       records.get(i),
541                       ZNode.ZERO_STAT);
542         }
543 
544         return success;
545       }
546       finally
547       {
548         cache.unlockWrite();
549       }
550     }
551 
552     // no cache
553     return _baseAccessor.createChildren(serverPaths, records, options);
554   }
555 
556   @Override
557   public boolean[] setChildren(List<String> paths, List<T> records, int options)
558   {
559     final int size = paths.size();
560     List<String> serverPaths = prependChroot(paths);
561 
562     Cache<T> cache = getCache(serverPaths);
563     if (cache != null)
564     {
565       try
566       {
567         cache.lockWrite();
568         List<Stat> setStats = new ArrayList<Stat>();
569         List<List<String>> pathsCreatedList =
570             new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
571         boolean[] success =
572             _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
573 
574         for (int i = 0; i < size; i++)
575         {
576           updateCache(cache,
577                       pathsCreatedList.get(i),
578                       success[i],
579                       serverPaths.get(i),
580                       records.get(i),
581                       setStats.get(i));
582         }
583 
584         return success;
585       }
586       finally
587       {
588         cache.unlockWrite();
589       }
590     }
591 
592     return _baseAccessor.setChildren(serverPaths, records, options);
593   }
594 
595   @Override
596   public boolean[] updateChildren(List<String> paths,
597                                   List<DataUpdater<T>> updaters,
598                                   int options)
599   {
600     final int size = paths.size();
601     List<String> serverPaths = prependChroot(paths);
602 
603     Cache<T> cache = getCache(serverPaths);
604     if (cache != null)
605     {
606       try
607       {
608         cache.lockWrite();
609 
610         List<Stat> setStats = new ArrayList<Stat>();
611         boolean[] success = new boolean[size];
612         List<List<String>> pathsCreatedList =
613             new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
614         List<T> updateData =
615             _baseAccessor.update(serverPaths,
616                                  updaters,
617                                  pathsCreatedList,
618                                  setStats,
619                                  options);
620 
621         // System.out.println("updateChild: ");
622         // for (T data : updateData)
623         // {
624         // System.out.println(data);
625         // }
626 
627         for (int i = 0; i < size; i++)
628         {
629           success[i] = (updateData.get(i) != null);
630           updateCache(cache,
631                       pathsCreatedList.get(i),
632                       success[i],
633                       serverPaths.get(i),
634                       updateData.get(i),
635                       setStats.get(i));
636         }
637         return success;
638       }
639       finally
640       {
641         cache.unlockWrite();
642       }
643     }
644 
645     // no cache
646     return _baseAccessor.updateChildren(serverPaths, updaters, options);
647   }
648 
649   // TODO: change to use async_exists
650   @Override
651   public boolean[] exists(List<String> paths, int options)
652   {
653     final int size = paths.size();
654     List<String> serverPaths = prependChroot(paths);
655 
656     boolean exists[] = new boolean[size];
657     for (int i = 0; i < size; i++)
658     {
659       exists[i] = exists(serverPaths.get(i), options);
660     }
661     return exists;
662   }
663 
664   @Override
665   public boolean[] remove(List<String> paths, int options)
666   {
667     final int size = paths.size();
668     List<String> serverPaths = prependChroot(paths);
669 
670     Cache<T> cache = getCache(serverPaths);
671     if (cache != null)
672     {
673       try
674       {
675         cache.lockWrite();
676 
677         boolean[] success = _baseAccessor.remove(serverPaths, options);
678 
679         for (int i = 0; i < size; i++)
680         {
681           if (success[i])
682           {
683             cache.purgeRecursive(serverPaths.get(i));
684           }
685         }
686         return success;
687       }
688       finally
689       {
690         cache.unlockWrite();
691       }
692     }
693 
694     // no cache
695     return _baseAccessor.remove(serverPaths, options);
696   }
697 
698   @Override
699   public List<T> get(List<String> paths, List<Stat> stats, int options)
700   {
701     if (paths == null || paths.isEmpty())
702     {
703       return Collections.emptyList();
704     }
705 
706     final int size = paths.size();
707     List<String> serverPaths = prependChroot(paths);
708 
709     List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
710     List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
711 
712     boolean needRead = false;
713     boolean needReads[] = new boolean[size]; // init to false
714 
715     Cache<T> cache = getCache(serverPaths);
716     if (cache != null)
717     {
718       try
719       {
720         cache.lockRead();
721         for (int i = 0; i < size; i++)
722         {
723           ZNode zNode = cache.get(serverPaths.get(i));
724           if (zNode != null)
725           {
726             // TODO: shall return a deep copy instead of reference
727             records.set(i, (T) zNode.getData());
728             readStats.set(i, zNode.getStat());
729           }
730           else
731           {
732             needRead = true;
733             needReads[i] = true;
734           }
735         }
736       }
737       finally
738       {
739         cache.unlockRead();
740       }
741 
742       // cache miss, fall back to zk and update cache
743       if (needRead)
744       {
745         cache.lockWrite();
746         try
747         {
748           List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
749           for (int i = 0; i < size; i++)
750           {
751             if (needReads[i])
752             {
753               records.set(i, readRecords.get(i));
754               cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
755             }
756           }
757         }
758         finally
759         {
760           cache.unlockWrite();
761         }
762       }
763 
764       if (stats != null)
765       {
766         stats.clear();
767         stats.addAll(readStats);
768       }
769 
770       return records;
771     }
772 
773     // no cache
774     return _baseAccessor.get(serverPaths, stats, options);
775   }
776 
777   // TODO: add cache
778   @Override
779   public Stat[] getStats(List<String> paths, int options)
780   {
781     List<String> serverPaths = prependChroot(paths);
782     return _baseAccessor.getStats(serverPaths, options);
783   }
784 
785   @Override
786   public List<String> getChildNames(String parentPath, int options)
787   {
788     String serverParentPath = prependChroot(parentPath);
789 
790     Cache<T> cache = getCache(serverParentPath);
791     if (cache != null)
792     {
793       // System.out.println("zk-cache");
794       ZNode znode = cache.get(serverParentPath);
795 
796       if (znode != null && znode.getChildSet() != Collections.<String> emptySet())
797       {
798         // System.out.println("zk-cache-hit: " + parentPath);
799         List<String> childNames = new ArrayList<String>(znode.getChildSet());
800         Collections.sort(childNames);
801         return childNames;
802       }
803       else
804       {
805         // System.out.println("zk-cache-miss");
806         try
807         {
808           cache.lockWrite();
809 
810           List<String> childNames =
811               _baseAccessor.getChildNames(serverParentPath, options);
812           // System.out.println("\t--" + childNames);
813           cache.addToParentChildSet(serverParentPath, childNames);
814 
815           return childNames;
816         }
817         finally
818         {
819           cache.unlockWrite();
820         }
821       }
822     }
823 
824     // no cache
825     return _baseAccessor.getChildNames(serverParentPath, options);
826   }
827 
828   @Override
829   public List<T> getChildren(String parentPath, List<Stat> stats, int options)
830   {
831     List<String> childNames = getChildNames(parentPath, options);
832     if (childNames == null)
833     {
834       return null;
835     }
836 
837     List<String> paths = new ArrayList<String>();
838     for (String childName : childNames)
839     {
840       String path = parentPath + "/" + childName;
841       paths.add(path);
842     }
843 
844     return get(paths, stats, options);
845   }
846 
847   @Override
848   public void subscribeDataChanges(String path, IZkDataListener listener)
849   {
850     String serverPath = prependChroot(path);
851 
852     _baseAccessor.subscribeDataChanges(serverPath, listener);
853   }
854 
855   @Override
856   public void unsubscribeDataChanges(String path, IZkDataListener listener)
857   {
858     String serverPath = prependChroot(path);
859 
860     _baseAccessor.unsubscribeDataChanges(serverPath, listener);
861   }
862 
863   @Override
864   public List<String> subscribeChildChanges(String path, IZkChildListener listener)
865   {
866     String serverPath = prependChroot(path);
867 
868     return _baseAccessor.subscribeChildChanges(serverPath, listener);
869   }
870 
871   @Override
872   public void unsubscribeChildChanges(String path, IZkChildListener listener)
873   {
874     String serverPath = prependChroot(path);
875 
876     _baseAccessor.unsubscribeChildChanges(serverPath, listener);
877   }
878 
879   @Override
880   public void subscribe(String parentPath, HelixPropertyListener listener)
881   {
882     String serverPath = prependChroot(parentPath);
883     _zkCache.subscribe(serverPath, listener);
884   }
885 
886   @Override
887   public void unsubscribe(String parentPath, HelixPropertyListener listener)
888   {
889     String serverPath = prependChroot(parentPath);
890     _zkCache.unsubscribe(serverPath, listener);
891   }
892 
893   @Override
894   public void start()
895   {
896 
897     LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths
898         + ", " + _zkCachePaths);
899 
900     // start event thread
901     try
902     {
903       _eventLock.lockInterruptibly();
904       if (_eventThread != null)
905       {
906         LOG.warn(_eventThread + " has already started");
907       }
908       else
909       {
910 
911         if (_zkCachePaths == null || _zkCachePaths.isEmpty())
912         {
913           LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
914         }
915         else
916         {
917           LOG.debug("Starting ZkCacheEventThread...");
918 
919           _eventThread = new ZkCacheEventThread("");
920           _eventThread.start();
921         }
922       }
923     }
924     catch (InterruptedException e)
925     {
926       LOG.error("Current thread is interrupted when starting ZkCacheEventThread. ", e);
927     }
928     finally
929     {
930       _eventLock.unlock();
931     }
932     LOG.debug("Start ZkCacheEventThread...done");
933 
934     _wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
935     _zkCache =
936         new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
937 
938     if (_wtCachePaths != null && !_wtCachePaths.isEmpty())
939     {
940       for (String path : _wtCachePaths)
941       {
942         _cacheMap.put(path, _wtCache);
943       }
944     }
945 
946     if (_zkCachePaths != null && !_zkCachePaths.isEmpty())
947     {
948       for (String path : _zkCachePaths)
949       {
950         _cacheMap.put(path, _zkCache);
951       }
952     }
953   }
954 
955   @Override
956   public void stop()
957   {
958     try
959     {
960       _eventLock.lockInterruptibly();
961 
962       if (_zkclient != null)
963       {
964         _zkclient.close();
965         _zkclient = null;
966       }
967 
968       if (_eventThread == null)
969       {
970         LOG.warn(_eventThread + " has already stopped");
971         return;
972       }
973 
974       LOG.debug("Stopping ZkCacheEventThread...");
975       _eventThread.interrupt();
976       _eventThread.join(2000);
977       _eventThread = null;
978     }
979     catch (InterruptedException e)
980     {
981       LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
982     }
983     finally
984     {
985       _eventLock.unlock();
986     }
987 
988     LOG.debug("Stop ZkCacheEventThread...done");
989 
990   }
991   
992   @Override
993   public void reset()
994   {
995     if (_wtCache != null)
996     {
997       _wtCache.reset();
998     }
999     
1000     if (_zkCache != null)
1001     {
1002       _zkCache.reset();
1003     }
1004   }
1005 }