1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.I0Itec.zkclient.DataUpdater;
28 import org.I0Itec.zkclient.exception.ZkNoNodeException;
29 import org.apache.helix.AccessOption;
30 import org.apache.helix.BaseDataAccessor;
31 import org.apache.helix.ControllerChangeListener;
32 import org.apache.helix.GroupCommit;
33 import org.apache.helix.HelixDataAccessor;
34 import org.apache.helix.HelixException;
35 import org.apache.helix.HelixProperty;
36 import org.apache.helix.InstanceType;
37 import org.apache.helix.NotificationContext;
38 import org.apache.helix.PropertyKey;
39 import org.apache.helix.PropertyKey.Builder;
40 import org.apache.helix.PropertyType;
41 import org.apache.helix.ZNRecord;
42 import org.apache.helix.ZNRecordAssembler;
43 import org.apache.helix.ZNRecordBucketizer;
44 import org.apache.helix.ZNRecordUpdater;
45 import org.apache.helix.controller.restlet.ZNRecordUpdate;
46 import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
47 import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
48 import org.apache.helix.model.LiveInstance;
49 import org.apache.log4j.Logger;
50 import org.apache.zookeeper.data.Stat;
51
52
53 public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener
54 {
55 private static Logger LOG =
56 Logger.getLogger(ZKHelixDataAccessor.class);
57 private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
58 final InstanceType _instanceType;
59 private final String _clusterName;
60 private final Builder _propertyKeyBuilder;
61 ZkPropertyTransferClient _zkPropertyTransferClient = null;
62 private final GroupCommit _groupCommit = new GroupCommit();
63 String _zkPropertyTransferSvcUrl = null;
64
65 public ZKHelixDataAccessor(String clusterName,
66 BaseDataAccessor<ZNRecord> baseDataAccessor)
67 {
68 this(clusterName, null, baseDataAccessor);
69 }
70
71 public ZKHelixDataAccessor(String clusterName,
72 InstanceType instanceType,
73 BaseDataAccessor<ZNRecord> baseDataAccessor)
74 {
75 _clusterName = clusterName;
76 _instanceType = instanceType;
77 _baseDataAccessor = baseDataAccessor;
78 _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
79 }
80
81 @Override
82 public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value)
83 {
84 PropertyType type = key.getType();
85 String path = key.getPath();
86 int options = constructOptions(type);
87 return _baseDataAccessor.create(path, value.getRecord(), options);
88 }
89
90 @Override
91 public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
92 {
93 PropertyType type = key.getType();
94 if (!value.isValid())
95 {
96 throw new HelixException("The ZNRecord for " + type + " is not valid.");
97 }
98
99 String path = key.getPath();
100 int options = constructOptions(type);
101
102 if (type.usePropertyTransferServer())
103 {
104 if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
105 {
106 ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
107 _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
108 return true;
109 }
110 }
111
112 boolean success = false;
113 switch (type)
114 {
115 case IDEALSTATES:
116 case EXTERNALVIEW:
117
118 if (value.getBucketSize() > 0)
119 {
120
121 ZNRecord metaRecord = new ZNRecord(value.getId());
122 metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
123 success = _baseDataAccessor.set(path, metaRecord, options);
124 if (success)
125 {
126 ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
127
128 Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
129 List<String> paths = new ArrayList<String>();
130 List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
131 for (String bucketName : map.keySet())
132 {
133 paths.add(path + "/" + bucketName);
134 bucketizedRecords.add(map.get(bucketName));
135 }
136
137
138 _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
139 }
140 }
141 else
142 {
143 success = _baseDataAccessor.set(path, value.getRecord(), options);
144 }
145 break;
146 default:
147 success = _baseDataAccessor.set(path, value.getRecord(), options);
148 break;
149 }
150 return success;
151 }
152
153 @Override
154 public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value)
155 {
156 PropertyType type = key.getType();
157 String path = key.getPath();
158 int options = constructOptions(type);
159
160 boolean success = false;
161 switch (type)
162 {
163 case CURRENTSTATES:
164 success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
165 break;
166 default:
167 if (type.usePropertyTransferServer())
168 {
169 if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
170 {
171 ZNRecordUpdate update =
172 new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
173 _zkPropertyTransferClient.enqueueZNRecordUpdate(update,
174 _zkPropertyTransferSvcUrl);
175
176 return true;
177 }
178 else
179 {
180 if(LOG.isTraceEnabled()){
181 LOG.trace("getPropertyTransferUrl is null, skip updating the value");
182 }
183 return true;
184 }
185 }
186 success =
187 _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
188 break;
189 }
190 return success;
191 }
192
193 @Override
194 public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
195 {
196 if (keys == null || keys.size() == 0)
197 {
198 return Collections.emptyList();
199 }
200
201 List<T> childValues = new ArrayList<T>();
202
203
204 List<String> paths = new ArrayList<String>();
205 for (PropertyKey key : keys)
206 {
207 paths.add(key.getPath());
208 }
209 List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0);
210
211
212 for (int i = 0; i < keys.size(); i++)
213 {
214 PropertyKey key = keys.get(i);
215 ZNRecord record = children.get(i);
216
217 PropertyType type = key.getType();
218 String path = key.getPath();
219 int options = constructOptions(type);
220
221
222 switch (type)
223 {
224 case CURRENTSTATES:
225 case IDEALSTATES:
226 case EXTERNALVIEW:
227
228 if (record != null)
229 {
230 HelixProperty property = new HelixProperty(record);
231
232 int bucketSize = property.getBucketSize();
233 if (bucketSize > 0)
234 {
235 List<ZNRecord> childRecords =
236 _baseDataAccessor.getChildren(path, null, options);
237 ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
238
239
240 if (assembledRecord != null)
241 {
242 record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
243 record.getListFields().putAll(assembledRecord.getListFields());
244 record.getMapFields().putAll(assembledRecord.getMapFields());
245 }
246 }
247 }
248 break;
249 default:
250 break;
251 }
252
253 @SuppressWarnings("unchecked")
254 T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
255 childValues.add(t);
256 }
257
258 return childValues;
259 }
260
261 @Override
262 public <T extends HelixProperty> T getProperty(PropertyKey key)
263 {
264 PropertyType type = key.getType();
265 String path = key.getPath();
266 int options = constructOptions(type);
267 ZNRecord record = null;
268 try
269 {
270 Stat stat = new Stat();
271 record = _baseDataAccessor.get(path, stat, options);
272 if (record != null)
273 {
274 record.setCreationTime(stat.getCtime());
275 record.setModifiedTime(stat.getMtime());
276 }
277 }
278 catch (ZkNoNodeException e)
279 {
280
281 }
282
283 switch (type)
284 {
285 case CURRENTSTATES:
286 case IDEALSTATES:
287 case EXTERNALVIEW:
288
289 if (record != null)
290 {
291 HelixProperty property = new HelixProperty(record);
292
293 int bucketSize = property.getBucketSize();
294 if (bucketSize > 0)
295 {
296 List<ZNRecord> childRecords =
297 _baseDataAccessor.getChildren(path, null, options);
298 ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
299
300
301 if (assembledRecord != null)
302 {
303 record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
304 record.getListFields().putAll(assembledRecord.getListFields());
305 record.getMapFields().putAll(assembledRecord.getMapFields());
306 }
307 }
308 }
309 break;
310 default:
311 break;
312 }
313
314 @SuppressWarnings("unchecked")
315 T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
316 return t;
317 }
318
319 @Override
320 public boolean removeProperty(PropertyKey key)
321 {
322 PropertyType type = key.getType();
323 String path = key.getPath();
324 int options = constructOptions(type);
325
326 return _baseDataAccessor.remove(path, options);
327 }
328
329 @Override
330 public List<String> getChildNames(PropertyKey key)
331 {
332 PropertyType type = key.getType();
333 String parentPath = key.getPath();
334 int options = constructOptions(type);
335 List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
336 if (childNames == null)
337 {
338 childNames = Collections.emptyList();
339 }
340 return childNames;
341 }
342
343 @Override
344 public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
345 {
346 PropertyType type = key.getType();
347 String parentPath = key.getPath();
348 int options = constructOptions(type);
349 List<T> childValues = new ArrayList<T>();
350
351 List<ZNRecord> children = _baseDataAccessor.getChildren(parentPath, null, options);
352 if (children != null)
353 {
354 for (ZNRecord record : children)
355 {
356 switch (type)
357 {
358 case CURRENTSTATES:
359 case IDEALSTATES:
360 case EXTERNALVIEW:
361 if (record != null)
362 {
363 HelixProperty property = new HelixProperty(record);
364
365 int bucketSize = property.getBucketSize();
366 if (bucketSize > 0)
367 {
368
369 String childPath = parentPath + "/" + record.getId();
370 List<ZNRecord> childRecords =
371 _baseDataAccessor.getChildren(childPath, null, options);
372 ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
373
374
375 if (assembledRecord != null)
376 {
377 record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
378 record.getListFields().putAll(assembledRecord.getListFields());
379 record.getMapFields().putAll(assembledRecord.getMapFields());
380 }
381 }
382 }
383
384 break;
385 default:
386 break;
387 }
388
389 if (record != null)
390 {
391 @SuppressWarnings("unchecked")
392 T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
393 childValues.add(t);
394 }
395 }
396 }
397 return childValues;
398 }
399
400 @Override
401 public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key)
402 {
403 PropertyType type = key.getType();
404 String parentPath = key.getPath();
405 int options = constructOptions(type);
406 List<T> children = getChildValues(key);
407 Map<String, T> childValuesMap = new HashMap<String, T>();
408 for (T t : children)
409 {
410 childValuesMap.put(t.getRecord().getId(), t);
411 }
412 return childValuesMap;
413
414 }
415
416 @Override
417 public Builder keyBuilder()
418 {
419 return _propertyKeyBuilder;
420 }
421
422 private int constructOptions(PropertyType type)
423 {
424 int options = 0;
425 if (type.isPersistent())
426 {
427 options = options | AccessOption.PERSISTENT;
428 }
429 else
430 {
431 options = options | AccessOption.EPHEMERAL;
432 }
433
434 return options;
435 }
436
437 @Override
438 public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
439 List<T> children)
440 {
441
442 int options = -1;
443 List<String> paths = new ArrayList<String>();
444 List<ZNRecord> records = new ArrayList<ZNRecord>();
445 for (int i = 0; i < keys.size(); i++)
446 {
447 PropertyKey key = keys.get(i);
448 PropertyType type = key.getType();
449 String path = key.getPath();
450 paths.add(path);
451 HelixProperty value = children.get(i);
452 records.add(value.getRecord());
453 options = constructOptions(type);
454 }
455 return _baseDataAccessor.createChildren(paths, records, options);
456 }
457
458 @Override
459 public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys,
460 List<T> children)
461 {
462 int options = -1;
463 List<String> paths = new ArrayList<String>();
464 List<ZNRecord> records = new ArrayList<ZNRecord>();
465
466 List<List<String>> bucketizedPaths =
467 new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
468 List<List<ZNRecord>> bucketizedRecords =
469 new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(),
470 null));
471
472 for (int i = 0; i < keys.size(); i++)
473 {
474 PropertyKey key = keys.get(i);
475 PropertyType type = key.getType();
476 String path = key.getPath();
477 paths.add(path);
478 options = constructOptions(type);
479
480 HelixProperty value = children.get(i);
481
482 switch (type)
483 {
484 case EXTERNALVIEW:
485 if (value.getBucketSize() == 0)
486 {
487 records.add(value.getRecord());
488 }
489 else
490 {
491 _baseDataAccessor.remove(path, options);
492
493 ZNRecord metaRecord = new ZNRecord(value.getId());
494 metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
495 records.add(metaRecord);
496
497 ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
498
499 Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
500 List<String> childBucketizedPaths = new ArrayList<String>();
501 List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
502 for (String bucketName : map.keySet())
503 {
504 childBucketizedPaths.add(path + "/" + bucketName);
505 childBucketizedRecords.add(map.get(bucketName));
506 }
507 bucketizedPaths.set(i, childBucketizedPaths);
508 bucketizedRecords.set(i, childBucketizedRecords);
509 }
510 break;
511 default:
512 records.add(value.getRecord());
513 break;
514 }
515 }
516
517
518 boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
519
520
521 List<String> allBucketizedPaths = new ArrayList<String>();
522 List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
523
524 for (int i = 0; i < keys.size(); i++)
525 {
526 if (success[i] && bucketizedPaths.get(i) != null)
527 {
528 allBucketizedPaths.addAll(bucketizedPaths.get(i));
529 allBucketizedRecords.addAll(bucketizedRecords.get(i));
530 }
531 }
532
533
534 _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
535
536 return success;
537 }
538
539 @Override
540 public BaseDataAccessor<ZNRecord> getBaseDataAccessor()
541 {
542 return _baseDataAccessor;
543 }
544
545 @Override
546 public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
547 List<DataUpdater<ZNRecord>> updaters,
548 int options)
549 {
550 return _baseDataAccessor.updateChildren(paths, updaters, options);
551 }
552
553 public void shutdown()
554 {
555 if (_zkPropertyTransferClient != null)
556 {
557 _zkPropertyTransferClient.shutdown();
558 }
559 }
560
561 @Override
562 public void onControllerChange(NotificationContext changeContext)
563 {
564 LOG.info("Controller has changed");
565 refreshZkPropertyTransferUrl();
566 if (_zkPropertyTransferClient == null)
567 {
568 if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0)
569 {
570 LOG.info("Creating ZkPropertyTransferClient as we get url "
571 + _zkPropertyTransferSvcUrl);
572 _zkPropertyTransferClient =
573 new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
574 }
575 }
576 }
577
578 void refreshZkPropertyTransferUrl()
579 {
580 try
581 {
582 LiveInstance leader = getProperty(keyBuilder().controllerLeader());
583 if (leader != null)
584 {
585 _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
586 LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl
587 + " Controller " + leader.getInstanceName());
588 }
589 else
590 {
591 _zkPropertyTransferSvcUrl = null;
592 }
593 }
594 catch (Exception e)
595 {
596
597 _zkPropertyTransferSvcUrl = null;
598 }
599 }
600 }