1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
23 import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
24 import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
25 import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
26 import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
27 import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
28 import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
29
30 import java.util.List;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.I0Itec.zkclient.IZkChildListener;
34 import org.I0Itec.zkclient.IZkDataListener;
35 import org.I0Itec.zkclient.exception.ZkNoNodeException;
36 import org.apache.helix.BaseDataAccessor;
37 import org.apache.helix.ConfigChangeListener;
38 import org.apache.helix.ControllerChangeListener;
39 import org.apache.helix.CurrentStateChangeListener;
40 import org.apache.helix.ExternalViewChangeListener;
41 import org.apache.helix.HealthStateChangeListener;
42 import org.apache.helix.HelixConstants.ChangeType;
43 import org.apache.helix.HelixDataAccessor;
44 import org.apache.helix.HelixManager;
45 import org.apache.helix.HelixProperty;
46 import org.apache.helix.IdealStateChangeListener;
47 import org.apache.helix.InstanceConfigChangeListener;
48 import org.apache.helix.InstanceType;
49 import org.apache.helix.LiveInstanceChangeListener;
50 import org.apache.helix.MessageListener;
51 import org.apache.helix.NotificationContext;
52 import org.apache.helix.NotificationContext.Type;
53 import org.apache.helix.PropertyKey;
54 import org.apache.helix.PropertyKey.Builder;
55 import org.apache.helix.PropertyPathConfig;
56 import org.apache.helix.ScopedConfigChangeListener;
57 import org.apache.helix.ZNRecord;
58 import org.apache.helix.model.CurrentState;
59 import org.apache.helix.model.ExternalView;
60 import org.apache.helix.model.HealthStat;
61 import org.apache.helix.model.IdealState;
62 import org.apache.helix.model.InstanceConfig;
63 import org.apache.helix.model.LiveInstance;
64 import org.apache.helix.model.Message;
65 import org.apache.log4j.Logger;
66 import org.apache.zookeeper.Watcher.Event.EventType;
67
68 public class CallbackHandler implements IZkChildListener, IZkDataListener
69
70 {
71
72 private static Logger logger = Logger.getLogger(CallbackHandler.class);
73
74 private final String _path;
75 private final Object _listener;
76 private final EventType[] _eventTypes;
77 private final HelixDataAccessor _accessor;
78 private final ChangeType _changeType;
79 private final ZkClient _zkClient;
80 private final AtomicLong _lastNotificationTimeStamp;
81 private final HelixManager _manager;
82 private final PropertyKey _propertyKey;
83
84 public CallbackHandler(HelixManager manager,
85 ZkClient client,
86 PropertyKey propertyKey,
87 Object listener,
88 EventType[] eventTypes,
89 ChangeType changeType)
90 {
91 this._manager = manager;
92 this._accessor = manager.getHelixDataAccessor();
93 this._zkClient = client;
94 this._propertyKey = propertyKey;
95 this._path = propertyKey.getPath();
96 this._listener = listener;
97 this._eventTypes = eventTypes;
98 this._changeType = changeType;
99 this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
100 init();
101 }
102
103 public Object getListener()
104 {
105 return _listener;
106 }
107
108 public String getPath()
109 {
110 return _path;
111 }
112
113 public void invoke(NotificationContext changeContext) throws Exception
114 {
115
116 synchronized (_manager)
117 {
118
119 long start = System.currentTimeMillis();
120 if (logger.isInfoEnabled())
121 {
122 logger.info(Thread.currentThread().getId() + " START:INVOKE "
123 + _path + " listener:" + _listener.getClass().getCanonicalName());
124 }
125
126 if (_changeType == IDEAL_STATE)
127 {
128
129 IdealStateChangeListener idealStateChangeListener =
130 (IdealStateChangeListener) _listener;
131 subscribeForChanges(changeContext, _path, true, true);
132 List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
133
134 idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
135
136 }
137 else if (_changeType == ChangeType.INSTANCE_CONFIG)
138 {
139 subscribeForChanges(changeContext, _path, true, true);
140 if (_listener instanceof ConfigChangeListener)
141 {
142 ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
143 List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
144 configChangeListener.onConfigChange(configs, changeContext);
145 } else if (_listener instanceof InstanceConfigChangeListener)
146 {
147 InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
148 List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
149 listener.onInstanceConfigChange(configs, changeContext);
150 }
151 }
152 else if (_changeType == CONFIG)
153 {
154 subscribeForChanges(changeContext, _path, true, true);
155 ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
156 List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
157 listener.onConfigChange(configs, changeContext);
158 }
159 else if (_changeType == LIVE_INSTANCE)
160 {
161 LiveInstanceChangeListener liveInstanceChangeListener =
162 (LiveInstanceChangeListener) _listener;
163 subscribeForChanges(changeContext, _path, true, true);
164 List<LiveInstance> liveInstances =
165 _accessor.getChildValues(_propertyKey);
166
167 liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
168
169 }
170 else if (_changeType == CURRENT_STATE)
171 {
172 CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
173 subscribeForChanges(changeContext, _path, true, true);
174 String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
175
176 List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
177
178 currentStateChangeListener.onStateChange(instanceName,
179 currentStates,
180 changeContext);
181
182 }
183 else if (_changeType == MESSAGE)
184 {
185 MessageListener messageListener = (MessageListener) _listener;
186 subscribeForChanges(changeContext, _path, true, false);
187 String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
188 List<Message> messages =
189 _accessor.getChildValues(_propertyKey);
190
191 messageListener.onMessage(instanceName, messages, changeContext);
192
193 }
194 else if (_changeType == MESSAGES_CONTROLLER)
195 {
196 MessageListener messageListener = (MessageListener) _listener;
197 subscribeForChanges(changeContext, _path, true, false);
198 List<Message> messages =
199 _accessor.getChildValues(_propertyKey);
200
201 messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
202
203 }
204 else if (_changeType == EXTERNAL_VIEW)
205 {
206 ExternalViewChangeListener externalViewListener =
207 (ExternalViewChangeListener) _listener;
208 subscribeForChanges(changeContext, _path, true, true);
209 List<ExternalView> externalViewList =
210 _accessor.getChildValues(_propertyKey);
211
212 externalViewListener.onExternalViewChange(externalViewList, changeContext);
213 }
214 else if (_changeType == ChangeType.CONTROLLER)
215 {
216 ControllerChangeListener controllerChangelistener =
217 (ControllerChangeListener) _listener;
218 subscribeForChanges(changeContext, _path, true, false);
219 controllerChangelistener.onControllerChange(changeContext);
220 }
221 else if (_changeType == ChangeType.HEALTH)
222 {
223 HealthStateChangeListener healthStateChangeListener =
224 (HealthStateChangeListener) _listener;
225 subscribeForChanges(changeContext, _path, true, true);
226
227 String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
228
229 List<HealthStat> healthReportList =
230 _accessor.getChildValues(_propertyKey);
231
232 healthStateChangeListener.onHealthChange(instanceName,
233 healthReportList,
234 changeContext);
235 }
236
237 long end = System.currentTimeMillis();
238 if (logger.isInfoEnabled())
239 {
240 logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
241 + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
242 + (end - start) +"ms");
243 }
244 }
245 }
246
247 private void subscribeChildChange(String path, NotificationContext context)
248 {
249 NotificationContext.Type type = context.getType();
250 if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
251 {
252 logger.info(_manager.getInstanceName() + " subscribes child-change. path: "
253 + path + ", listener: " + _listener);
254 _zkClient.subscribeChildChanges(path, this);
255 }
256 else if (type == NotificationContext.Type.FINALIZE)
257 {
258 logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
259 + path + ", listener: " + _listener);
260
261 _zkClient.unsubscribeChildChanges(path, this);
262 }
263 }
264
265 private void subscribeDataChange(String path, NotificationContext context)
266 {
267 NotificationContext.Type type = context.getType();
268 if (type == NotificationContext.Type.INIT
269 || type == NotificationContext.Type.CALLBACK)
270 {
271 if (logger.isDebugEnabled())
272 {
273 logger.debug(_manager.getInstanceName() + " subscribe data-change. path: "
274 + path + ", listener: " + _listener);
275 }
276 _zkClient.subscribeDataChanges(path, this);
277
278 }
279 else if (type == NotificationContext.Type.FINALIZE)
280 {
281 logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
282 + path + ", listener: " + _listener);
283
284 _zkClient.unsubscribeDataChanges(path, this);
285 }
286 }
287
288
289 private void subscribeForChanges(NotificationContext context,
290 String path,
291 boolean watchParent,
292 boolean watchChild)
293 {
294 if (watchParent)
295 {
296 subscribeChildChange(path, context);
297 }
298
299 if (watchChild)
300 {
301 try
302 {
303 switch(_changeType)
304 {
305 case CURRENT_STATE:
306 case IDEAL_STATE:
307 case EXTERNAL_VIEW:
308 {
309
310 BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
311 List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
312 for (ZNRecord record : records)
313 {
314 HelixProperty property = new HelixProperty(record);
315 String childPath = path + "/" + record.getId();
316
317 int bucketSize = property.getBucketSize();
318 if (bucketSize > 0)
319 {
320
321
322 subscribeChildChange(childPath, context);
323 subscribeDataChange(childPath, context);
324
325
326 List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
327 if (bucketizedChildNames != null)
328 {
329 for (String bucketizedChildName : bucketizedChildNames)
330 {
331 String bucketizedChildPath = childPath + "/" + bucketizedChildName;
332 subscribeDataChange(bucketizedChildPath, context);
333 }
334 }
335 } else
336 {
337 subscribeDataChange(childPath, context);
338 }
339 }
340 break;
341 }
342 default:
343 {
344 List<String> childNames = _zkClient.getChildren(path);
345 if (childNames != null)
346 {
347 for (String childName : childNames)
348 {
349 String childPath = path + "/" + childName;
350 subscribeDataChange(childPath, context);
351 }
352 }
353 break;
354 }
355 }
356 }
357 catch (ZkNoNodeException e)
358 {
359 logger.warn("fail to subscribe child/data change. path: " + path
360 + ", listener: " + _listener, e);
361 }
362 }
363
364 }
365
366 public EventType[] getEventTypes()
367 {
368 return _eventTypes;
369 }
370
371
372
373
374
375
376 public void init()
377 {
378 updateNotificationTime(System.nanoTime());
379 try
380 {
381 NotificationContext changeContext = new NotificationContext(_manager);
382 changeContext.setType(NotificationContext.Type.INIT);
383 invoke(changeContext);
384 }
385 catch (Exception e)
386 {
387 String msg = "Exception while invoking init callback for listener:"+ _listener;
388 ZKExceptionHandler.getInstance().handle(msg, e);
389 }
390 }
391
392 @Override
393 public void handleDataChange(String dataPath, Object data)
394 {
395 try
396 {
397 updateNotificationTime(System.nanoTime());
398 if (dataPath != null && dataPath.startsWith(_path))
399 {
400 NotificationContext changeContext = new NotificationContext(_manager);
401 changeContext.setType(NotificationContext.Type.CALLBACK);
402 invoke(changeContext);
403 }
404 }
405 catch (Exception e)
406 {
407 String msg = "exception in handling data-change. path: " + dataPath
408 + ", listener: " + _listener;
409 ZKExceptionHandler.getInstance().handle(msg, e);
410 }
411 }
412
413 @Override
414 public void handleDataDeleted(String dataPath)
415 {
416 try
417 {
418 updateNotificationTime(System.nanoTime());
419 if (dataPath != null && dataPath.startsWith(_path))
420 {
421 logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: "
422 + dataPath + ", listener: " + _listener);
423 _zkClient.unsubscribeDataChanges(dataPath, this);
424
425
426
427 logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
428 + dataPath + ", listener: " + _listener);
429 _zkClient.unsubscribeChildChanges(dataPath, this);
430
431
432
433
434 }
435 }
436 catch (Exception e)
437 {
438 String msg = "exception in handling data-delete-change. path: " + dataPath
439 + ", listener: " + _listener;
440 ZKExceptionHandler.getInstance().handle(msg, e);
441 }
442 }
443
444 @Override
445 public void handleChildChange(String parentPath, List<String> currentChilds)
446 {
447 try
448 {
449 updateNotificationTime(System.nanoTime());
450 if (parentPath != null && parentPath.startsWith(_path))
451 {
452 NotificationContext changeContext = new NotificationContext(_manager);
453
454 if (currentChilds == null) {
455
456 if (parentPath.equals(_path)) {
457
458 _manager.removeListener(_propertyKey, _listener);
459 }
460 changeContext.setType(NotificationContext.Type.FINALIZE);
461 } else {
462 changeContext.setType(NotificationContext.Type.CALLBACK);
463 }
464 invoke(changeContext);
465 }
466 }
467 catch (Exception e)
468 {
469 String msg = "exception in handling child-change. instance: " + _manager.getInstanceName()
470 + ", parentPath: " + parentPath + ", listener: " + _listener;
471 ZKExceptionHandler.getInstance().handle(msg, e);
472 }
473 }
474
475
476
477
478
479 public void reset()
480 {
481 try
482 {
483 NotificationContext changeContext = new NotificationContext(_manager);
484 changeContext.setType(NotificationContext.Type.FINALIZE);
485 invoke(changeContext);
486 }
487 catch (Exception e)
488 {
489 String msg = "Exception while resetting the listener:"+_listener;
490 ZKExceptionHandler.getInstance().handle(msg, e);
491 }
492 }
493
494 private void updateNotificationTime(long nanoTime)
495 {
496 long l = _lastNotificationTimeStamp.get();
497 while (nanoTime > l)
498 {
499 boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
500 if (b)
501 {
502 break;
503 }
504 else
505 {
506 l = _lastNotificationTimeStamp.get();
507 }
508 }
509 }
510
511 }