1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25
26 import org.I0Itec.zkclient.DataUpdater;
27 import org.apache.helix.InstanceType;
28 import org.apache.helix.PropertyPathConfig;
29 import org.apache.helix.PropertyType;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
32 import org.apache.log4j.Logger;
33 import org.apache.zookeeper.CreateMode;
34 import org.apache.zookeeper.data.Stat;
35
36
37 public final class ZKUtil
38 {
39 private static Logger logger = Logger.getLogger(ZKUtil.class);
40 private static int RETRYLIMIT = 3;
41
42 private ZKUtil()
43 {
44 }
45
46 public static boolean isClusterSetup(String clusterName, ZkClient zkClient)
47 {
48 if (clusterName == null || zkClient == null)
49 {
50 return false;
51 }
52 ArrayList<String> requiredPaths = new ArrayList<String>();
53 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName));
54 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
55 clusterName,
56 ConfigScopeProperty.CLUSTER.toString(),
57 clusterName));
58 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
59 clusterName,
60 ConfigScopeProperty.PARTICIPANT.toString()));
61 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
62 clusterName,
63 ConfigScopeProperty.RESOURCE.toString()));
64 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE,clusterName));
65 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES,clusterName));
66 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.INSTANCES,clusterName));
67 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,clusterName));
68 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName));
69 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,clusterName));
70 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName));
71 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName));
72 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName));
73 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.HISTORY,clusterName));
74 boolean isValid =true;
75
76 for(String path:requiredPaths){
77 if(!zkClient.exists(path)){
78 isValid = false;
79 logger.info("Invalid cluster setup, missing znode path: " + path);
80 }
81 }
82 return isValid;
83 }
84
85 public static boolean isInstanceSetup(ZkClient zkclient, String clusterName, String instanceName, InstanceType type)
86 {
87 if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT)
88 {
89 ArrayList<String> requiredPaths = new ArrayList<String>();
90 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
91 clusterName,
92 ConfigScopeProperty.PARTICIPANT.toString(),
93 instanceName));
94 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES,
95 clusterName,
96 instanceName));
97 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
98 clusterName,
99 instanceName));
100 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES,
101 clusterName,
102 instanceName));
103 requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS,
104 clusterName,
105 instanceName));
106 boolean isValid =true;
107
108 for(String path:requiredPaths){
109 if(!zkclient.exists(path)){
110 isValid =false;
111 logger.info("Invalid instance setup, missing znode path: " + path);
112 }
113 }
114 return isValid;
115 }
116
117 return true;
118 }
119
120 public static void createChildren(ZkClient client,
121 String parentPath,
122 List<ZNRecord> list)
123 {
124 client.createPersistent(parentPath, true);
125 if (list != null)
126 {
127 for (ZNRecord record : list)
128 {
129 createChildren(client, parentPath, record);
130 }
131 }
132 }
133
134 public static void createChildren(ZkClient client,
135 String parentPath,
136 ZNRecord nodeRecord)
137 {
138 client.createPersistent(parentPath, true);
139
140 String id = nodeRecord.getId();
141 String temp = parentPath + "/" + id;
142 client.createPersistent(temp, nodeRecord);
143 }
144
145 public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list)
146 {
147
148 if (list != null)
149 {
150 for (ZNRecord record : list)
151 {
152 dropChildren(client, parentPath, record);
153 }
154 }
155 }
156
157 public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord)
158 {
159
160 String id = nodeRecord.getId();
161 String temp = parentPath + "/" + id;
162 client.deleteRecursive(temp);
163 }
164
165 public static List<ZNRecord> getChildren(ZkClient client, String path)
166 {
167
168 List<String> children = client.getChildren(path);
169 if (children == null || children.size() == 0)
170 {
171 return Collections.emptyList();
172 }
173
174 List<ZNRecord> childRecords = new ArrayList<ZNRecord>();
175 for (String child : children)
176 {
177 String childPath = path + "/" + child;
178 Stat newStat = new Stat();
179 ZNRecord record = client.readDataAndStat(childPath, newStat, true);
180 if (record != null)
181 {
182 record.setVersion(newStat.getVersion());
183 record.setCreationTime(newStat.getCtime());
184 record.setModifiedTime(newStat.getMtime());
185 childRecords.add(record);
186 }
187 }
188 return childRecords;
189 }
190
191 public static void updateIfExists(ZkClient client,
192 String path,
193 final ZNRecord record,
194 boolean mergeOnUpdate)
195 {
196 if (client.exists(path))
197 {
198 DataUpdater<Object> updater = new DataUpdater<Object>()
199 {
200 @Override
201 public Object update(Object currentData)
202 {
203 return record;
204 }
205 };
206 client.updateDataSerialized(path, updater);
207 }
208 }
209
210 public static void createOrUpdate(ZkClient client,
211 String path,
212 final ZNRecord record,
213 final boolean persistent,
214 final boolean mergeOnUpdate)
215 {
216 int retryCount = 0;
217 while (retryCount < RETRYLIMIT)
218 {
219 try
220 {
221 if (client.exists(path))
222 {
223 DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
224 {
225 @Override
226 public ZNRecord update(ZNRecord currentData)
227 {
228 if (currentData != null && mergeOnUpdate)
229 {
230 currentData.merge(record);
231 return currentData;
232 }
233 return record;
234 }
235 };
236 client.updateDataSerialized(path, updater);
237 }
238 else
239 {
240 CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
241 if (record.getDeltaList().size() > 0)
242 {
243 ZNRecord value = new ZNRecord(record.getId());
244 value.merge(record);
245 client.create(path, value, mode);
246 }
247 else
248 {
249 client.create(path, record, mode);
250 }
251 }
252 break;
253 }
254 catch (Exception e)
255 {
256 retryCount = retryCount + 1;
257 logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
258 + ". Will retry.");
259 }
260 }
261 }
262
263 public static void asyncCreateOrUpdate(ZkClient client,
264 String path,
265 final ZNRecord record,
266 final boolean persistent,
267 final boolean mergeOnUpdate)
268 {
269 try
270 {
271 if (client.exists(path))
272 {
273 if (mergeOnUpdate)
274 {
275 ZNRecord curRecord = client.readData(path);
276 if (curRecord != null)
277 {
278 curRecord.merge(record);
279 client.asyncSetData(path, curRecord, -1, null);
280 }
281 else
282 {
283 client.asyncSetData(path, record, -1, null);
284 }
285 }
286 else
287 {
288 client.asyncSetData(path, record, -1, null);
289 }
290 }
291 else
292 {
293 CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
294 if (record.getDeltaList().size() > 0)
295 {
296 ZNRecord newRecord = new ZNRecord(record.getId());
297 newRecord.merge(record);
298 client.create(path, null, mode);
299
300 client.asyncSetData(path, newRecord, -1, null);
301 }
302 else
303 {
304 client.create(path, null, mode);
305
306 client.asyncSetData(path, record, -1, null);
307 }
308 }
309 }
310 catch (Exception e)
311 {
312 logger.error("Exception in async create or update " + path + ". Exception: "
313 + e.getMessage() + ". Give up.");
314 }
315 }
316
317 public static void createOrReplace(ZkClient client,
318 String path,
319 final ZNRecord record,
320 final boolean persistent)
321 {
322 int retryCount = 0;
323 while (retryCount < RETRYLIMIT)
324 {
325 try
326 {
327 if (client.exists(path))
328 {
329 DataUpdater<Object> updater = new DataUpdater<Object>()
330 {
331 @Override
332 public Object update(Object currentData)
333 {
334 return record;
335 }
336 };
337 client.updateDataSerialized(path, updater);
338 }
339 else
340 {
341 CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
342 client.create(path, record, mode);
343 }
344 break;
345 }
346 catch (Exception e)
347 {
348 retryCount = retryCount + 1;
349 logger.warn("Exception trying to createOrReplace " + path + " Exception:"
350 + e.getMessage() + ". Will retry.");
351 }
352 }
353 }
354
355 public static void subtract(ZkClient client,
356 String path,
357 final ZNRecord recordTosubtract)
358 {
359 int retryCount = 0;
360 while (retryCount < RETRYLIMIT)
361 {
362 try
363 {
364 if (client.exists(path))
365 {
366 DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
367 {
368 @Override
369 public ZNRecord update(ZNRecord currentData)
370 {
371 currentData.subtract(recordTosubtract);
372 return currentData;
373 }
374 };
375 client.updateDataSerialized(path, updater);
376 break;
377 }
378 }
379 catch (Exception e)
380 {
381 retryCount = retryCount + 1;
382 logger.warn("Exception trying to createOrReplace " + path + " Exception:"
383 + e.getMessage() + ". Will retry.");
384 e.printStackTrace();
385 }
386 }
387
388 }
389 }