1 package org.apache.helix.controller;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.FileFilter;
23 import java.util.LinkedList;
24 import java.util.List;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicLong;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.apache.helix.manager.zk.ZkClient;
30 import org.apache.log4j.Logger;
31 import org.apache.zookeeper.data.Stat;
32
33
34
35
36
37
38 public class HierarchicalDataHolder<T>
39 {
40 private static final Logger logger = Logger
41 .getLogger(HierarchicalDataHolder.class.getName());
42 AtomicReference<Node<T>> root;
43
44
45
46
47 AtomicLong currentVersion;
48 private final ZkClient _zkClient;
49 private final String _rootPath;
50 private final FileFilter _filter;
51
52 public HierarchicalDataHolder(ZkClient client, String rootPath,
53 FileFilter filter)
54 {
55 this._zkClient = client;
56 this._rootPath = rootPath;
57 this._filter = filter;
58
59 root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
60 currentVersion = new AtomicLong(1);
61 refreshData();
62 }
63
64 public long getVersion()
65 {
66 return currentVersion.get();
67 }
68
69 public boolean refreshData()
70 {
71 Node<T> newRoot = new Node<T>();
72 boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
73 if (dataChanged)
74 {
75 currentVersion.getAndIncrement();
76 root.set(newRoot);
77 return true;
78 } else
79 {
80 return false;
81 }
82 }
83
84
85
86 private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot,
87 String path)
88 {
89 boolean dataChanged = false;
90 Stat newStat = _zkClient.getStat(path);
91 Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
92 newRoot.name = path;
93 if (newStat != null)
94 {
95 if (oldStat == null)
96 {
97 newRoot.stat = newStat;
98 newRoot.data = _zkClient.<T> readData(path, true);
99 dataChanged = true;
100 } else if (newStat.equals(oldStat))
101 {
102 newRoot.stat = oldStat;
103 newRoot.data = oldRoot.data;
104 } else
105 {
106 dataChanged = true;
107 newRoot.stat = newStat;
108 newRoot.data = _zkClient.<T> readData(path, true);
109 }
110 if (newStat.getNumChildren() > 0)
111 {
112 List<String> children = _zkClient.getChildren(path);
113 for (String child : children)
114 {
115 String newPath = path + "/" + child;
116 Node<T> oldChild = (oldRoot != null && oldRoot.children != null) ? oldRoot.children
117 .get(child) : null;
118 if (newRoot.children == null)
119 {
120 newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
121 }
122 if (!newRoot.children.contains(child))
123 {
124 newRoot.children.put(child, new Node<T>());
125 }
126 Node<T> newChild = newRoot.children.get(child);
127 boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
128 dataChanged = dataChanged || childChanged;
129 }
130 }
131 } else
132 {
133 logger.info(path + " does not exist");
134 }
135 return dataChanged;
136 }
137
138 static class Node<T>
139 {
140 String name;
141 Stat stat;
142 T data;
143 ConcurrentHashMap<String, Node<T>> children;
144
145 }
146
147 public void print()
148 {
149 logger.info("START "+ _rootPath);
150 LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
151 stack.push(root.get());
152 while (!stack.isEmpty())
153 {
154 Node<T> pop = stack.pop();
155 if (pop != null)
156 {
157 logger.info("name:"+ pop.name);
158 logger.info("\tdata:"+pop.data);
159 if (pop.children != null)
160 {
161 for (Node<T> child : pop.children.values())
162 {
163 stack.push(child);
164 }
165 }
166 }
167 }
168 logger.info("END "+ _rootPath);
169 }
170 }