View Javadoc

1   package org.apache.helix.controller;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.FileFilter;
23  import java.util.LinkedList;
24  import java.util.List;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.atomic.AtomicLong;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.apache.helix.manager.zk.ZkClient;
30  import org.apache.log4j.Logger;
31  import org.apache.zookeeper.data.Stat;
32  
33  
34  /**
35   * Generic class that will read the data given the root path.
36   * 
37   */
38  public class HierarchicalDataHolder<T>
39  {
40    private static final Logger logger = Logger
41        .getLogger(HierarchicalDataHolder.class.getName());
42    AtomicReference<Node<T>> root;
43  
44    /**
45     * currentVersion, gets updated when data is read from original source
46     */
47    AtomicLong currentVersion;
48    private final ZkClient _zkClient;
49    private final String _rootPath;
50    private final FileFilter _filter;
51  
52    public HierarchicalDataHolder(ZkClient client, String rootPath,
53        FileFilter filter)
54    {
55      this._zkClient = client;
56      this._rootPath = rootPath;
57      this._filter = filter;
58      // Node<T> initialValue = new Node<T>();
59      root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
60      currentVersion = new AtomicLong(1);
61      refreshData();
62    }
63  
64    public long getVersion()
65    {
66      return currentVersion.get();
67    }
68  
69    public boolean refreshData()
70    {
71      Node<T> newRoot = new Node<T>();
72      boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
73      if (dataChanged)
74      {
75        currentVersion.getAndIncrement();
76        root.set(newRoot);
77        return true;
78      } else
79      {
80        return false;
81      }
82    }
83  
84    // private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
85    // newRoot,Stat newStat, String path)
86    private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
87        String path)
88    {
89      boolean dataChanged = false;
90      Stat newStat = _zkClient.getStat(path);
91      Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
92      newRoot.name = path;
93      if (newStat != null)
94      {
95        if (oldStat == null)
96        {
97          newRoot.stat = newStat;
98          newRoot.data = _zkClient.<T> readData(path, true);
99          dataChanged = true;
100       } else if (newStat.equals(oldStat))
101       {
102         newRoot.stat = oldStat;
103         newRoot.data = oldRoot.data;
104       } else
105       {
106         dataChanged = true;
107         newRoot.stat = newStat;
108         newRoot.data = _zkClient.<T> readData(path, true);
109       }
110       if (newStat.getNumChildren() > 0)
111       {
112         List<String> children = _zkClient.getChildren(path);
113         for (String child : children)
114         {
115           String newPath = path + "/" + child;
116           Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
117               .get(child) : null;
118           if (newRoot.children == null)
119           {
120             newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
121           }
122           if (!newRoot.children.contains(child))
123           {
124             newRoot.children.put(child, new Node<T>());
125           }
126           Node<T> newChild = newRoot.children.get(child);
127           boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
128           dataChanged = dataChanged || childChanged;
129         }
130       }
131     } else
132     {
133       logger.info(path + " does not exist");
134     }
135     return dataChanged;
136   }
137 
138   static class Node<T>
139   {
140     String name;
141     Stat stat;
142     T data;
143     ConcurrentHashMap<String, Node<T>> children;
144 
145   }
146 
147   public void print()
148   {
149     logger.info("START "+ _rootPath);
150     LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
151     stack.push(root.get());
152     while (!stack.isEmpty())
153     {
154       Node<T> pop = stack.pop();
155       if (pop != null)
156       {
157         logger.info("name:"+ pop.name);
158         logger.info("\tdata:"+pop.data);
159         if (pop.children != null)
160         {
161           for (Node<T> child : pop.children.values())
162           {
163             stack.push(child);
164           }
165         }
166       }
167     }
168     logger.info("END "+ _rootPath);
169   }
170 }