View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   
4   /*
5    * Licensed to the Apache Software Foundation (ASF) under one
6    * or more contributor license agreements.  See the NOTICE file
7    * distributed with this work for additional information
8    * regarding copyright ownership.  The ASF licenses this file
9    * to you under the Apache License, Version 2.0 (the
10   * "License"); you may not use this file except in compliance
11   * with the License.  You may obtain a copy of the License at
12   *
13   *   http://www.apache.org/licenses/LICENSE-2.0
14   *
15   * Unless required by applicable law or agreed to in writing,
16   * software distributed under the License is distributed on an
17   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18   * KIND, either express or implied.  See the License for the
19   * specific language governing permissions and limitations
20   * under the License.
21   */
22  
23  import java.io.File;
24  import java.util.List;
25  
26  import org.I0Itec.zkclient.exception.ZkNoNodeException;
27  import org.apache.helix.AccessOption;
28  import org.apache.helix.BaseDataAccessor;
29  import org.apache.helix.store.zk.ZNode;
30  import org.apache.helix.util.HelixUtil;
31  import org.apache.log4j.Logger;
32  import org.apache.zookeeper.data.Stat;
33  
34  
35  public class WriteThroughCache<T> extends Cache<T>
36  {
37    private static Logger     LOG = Logger.getLogger(WriteThroughCache.class);
38  
39    final BaseDataAccessor<T> _accessor;
40  
41    public WriteThroughCache(BaseDataAccessor<T> accessor, List<String> paths)
42    {
43      super();
44      _accessor = accessor;
45  
46      // init cache
47      if (paths != null && !paths.isEmpty())
48      {
49        for (String path : paths)
50        {
51          updateRecursive(path);
52        }
53      }
54    }
55  
56    @Override
57    public void update(String path, T data, Stat stat)
58    {
59      String parentPath = HelixUtil.getZkParentPath(path);
60      String childName = HelixUtil.getZkName(path);
61      addToParentChildSet(parentPath, childName);
62      
63      ZNode znode = _cache.get(path);
64      if (znode == null)
65      {
66        _cache.put(path, new ZNode(path, data, stat));
67      }
68      else
69      {
70        znode.setData(data);
71        znode.setStat(stat);
72      }
73    }
74    
75    @Override
76    public void updateRecursive(String path)
77    {
78      if (path == null)
79      {
80        return;
81      }
82  
83      try
84      {
85        _lock.writeLock().lock();
86  
87        // update this node
88        Stat stat = new Stat();
89        T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
90  
91        update(path, readData, stat);
92        
93        // recursively update children nodes if not exists
94        ZNode znode = _cache.get(path);
95        List<String> childNames = _accessor.getChildNames(path, 0);
96        if (childNames != null && childNames.size() > 0)
97        {
98          for (String childName : childNames)
99          {
100           String childPath = path + "/" + childName;
101           if (!znode.hasChild(childName))
102           {
103             znode.addChild(childName);
104             updateRecursive(childPath);
105           }
106         }
107       }
108     }
109     catch (ZkNoNodeException e)
110     {
111       // OK. someone delete znode while we are updating cache
112     }
113     finally
114     {
115       _lock.writeLock().unlock();
116     }
117   }
118 }