View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import static org.apache.helix.HelixConstants.ChangeType.CONFIG;
23  import static org.apache.helix.HelixConstants.ChangeType.CURRENT_STATE;
24  import static org.apache.helix.HelixConstants.ChangeType.EXTERNAL_VIEW;
25  import static org.apache.helix.HelixConstants.ChangeType.IDEAL_STATE;
26  import static org.apache.helix.HelixConstants.ChangeType.LIVE_INSTANCE;
27  import static org.apache.helix.HelixConstants.ChangeType.MESSAGE;
28  import static org.apache.helix.HelixConstants.ChangeType.MESSAGES_CONTROLLER;
29  
30  import java.util.List;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.I0Itec.zkclient.IZkChildListener;
34  import org.I0Itec.zkclient.IZkDataListener;
35  import org.I0Itec.zkclient.exception.ZkNoNodeException;
36  import org.apache.helix.BaseDataAccessor;
37  import org.apache.helix.ConfigChangeListener;
38  import org.apache.helix.ControllerChangeListener;
39  import org.apache.helix.CurrentStateChangeListener;
40  import org.apache.helix.ExternalViewChangeListener;
41  import org.apache.helix.HealthStateChangeListener;
42  import org.apache.helix.HelixConstants.ChangeType;
43  import org.apache.helix.HelixDataAccessor;
44  import org.apache.helix.HelixManager;
45  import org.apache.helix.HelixProperty;
46  import org.apache.helix.IdealStateChangeListener;
47  import org.apache.helix.InstanceConfigChangeListener;
48  import org.apache.helix.InstanceType;
49  import org.apache.helix.LiveInstanceChangeListener;
50  import org.apache.helix.MessageListener;
51  import org.apache.helix.NotificationContext;
52  import org.apache.helix.NotificationContext.Type;
53  import org.apache.helix.PropertyKey;
54  import org.apache.helix.PropertyKey.Builder;
55  import org.apache.helix.PropertyPathConfig;
56  import org.apache.helix.ScopedConfigChangeListener;
57  import org.apache.helix.ZNRecord;
58  import org.apache.helix.model.CurrentState;
59  import org.apache.helix.model.ExternalView;
60  import org.apache.helix.model.HealthStat;
61  import org.apache.helix.model.IdealState;
62  import org.apache.helix.model.InstanceConfig;
63  import org.apache.helix.model.LiveInstance;
64  import org.apache.helix.model.Message;
65  import org.apache.log4j.Logger;
66  import org.apache.zookeeper.Watcher.Event.EventType;
67  
68  public class CallbackHandler implements IZkChildListener, IZkDataListener
69  
70  {
71  
72    private static Logger           logger = Logger.getLogger(CallbackHandler.class);
73  
74    private final String            _path;
75    private final Object            _listener;
76    private final EventType[]       _eventTypes;
77    private final HelixDataAccessor _accessor;
78    private final ChangeType        _changeType;
79    private final ZkClient          _zkClient;
80    private final AtomicLong        _lastNotificationTimeStamp;
81    private final HelixManager      _manager;
82    private final PropertyKey 	  _propertyKey;
83  
84    public CallbackHandler(HelixManager manager,
85                           ZkClient client,
86                           PropertyKey propertyKey,
87                           Object listener,
88                           EventType[] eventTypes,
89                           ChangeType changeType)
90    {
91      this._manager = manager;
92      this._accessor = manager.getHelixDataAccessor();
93      this._zkClient = client;
94      this._propertyKey = propertyKey;
95      this._path = propertyKey.getPath();
96      this._listener = listener;
97      this._eventTypes = eventTypes;
98      this._changeType = changeType;
99      this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
100     init();
101   }
102 
103   public Object getListener()
104   {
105     return _listener;
106   }
107 
108   public String getPath()
109   {
110     return _path;
111   }
112 
113   public void invoke(NotificationContext changeContext) throws Exception
114   {
115     // This allows the listener to work with one change at a time
116     synchronized (_manager)
117     {
118       // Builder keyBuilder = _accessor.keyBuilder();
119       long start = System.currentTimeMillis();
120       if (logger.isInfoEnabled())
121       {
122         logger.info(Thread.currentThread().getId() + " START:INVOKE "
123             + _path + " listener:" + _listener.getClass().getCanonicalName());
124       }
125 
126       if (_changeType == IDEAL_STATE)
127       {
128 
129         IdealStateChangeListener idealStateChangeListener =
130             (IdealStateChangeListener) _listener;
131         subscribeForChanges(changeContext, _path, true, true);
132         List<IdealState> idealStates = _accessor.getChildValues(_propertyKey);
133 
134         idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
135 
136       }
137       else if (_changeType == ChangeType.INSTANCE_CONFIG)
138       {
139         subscribeForChanges(changeContext, _path, true, true);
140       	if (_listener instanceof ConfigChangeListener)
141       	{
142       		ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
143       		List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
144       		configChangeListener.onConfigChange(configs, changeContext);
145       	} else if (_listener instanceof InstanceConfigChangeListener)
146       	{
147       		InstanceConfigChangeListener listener = (InstanceConfigChangeListener) _listener;
148       		List<InstanceConfig> configs = _accessor.getChildValues(_propertyKey);
149       		listener.onInstanceConfigChange(configs, changeContext);    		
150       	}	  
151       }
152       else if (_changeType == CONFIG)
153       {
154             subscribeForChanges(changeContext, _path, true, true);
155       		ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
156       		List<HelixProperty> configs = _accessor.getChildValues(_propertyKey);
157       		listener.onConfigChange(configs, changeContext);
158       }
159       else if (_changeType == LIVE_INSTANCE)
160       {
161         LiveInstanceChangeListener liveInstanceChangeListener =
162             (LiveInstanceChangeListener) _listener;
163         subscribeForChanges(changeContext, _path, true, true);
164         List<LiveInstance> liveInstances =
165             _accessor.getChildValues(_propertyKey);
166 
167         liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
168 
169       }
170       else if (_changeType == CURRENT_STATE)
171       {
172         CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener) _listener;
173         subscribeForChanges(changeContext, _path, true, true);
174         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
175 
176         List<CurrentState> currentStates = _accessor.getChildValues(_propertyKey);
177 
178         currentStateChangeListener.onStateChange(instanceName,
179                                                  currentStates,
180                                                  changeContext);
181 
182       }
183       else if (_changeType == MESSAGE)
184       {
185         MessageListener messageListener = (MessageListener) _listener;
186         subscribeForChanges(changeContext, _path, true, false);
187         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
188         List<Message> messages =
189             _accessor.getChildValues(_propertyKey);
190 
191         messageListener.onMessage(instanceName, messages, changeContext);
192 
193       }
194       else if (_changeType == MESSAGES_CONTROLLER)
195       {
196         MessageListener messageListener = (MessageListener) _listener;
197         subscribeForChanges(changeContext, _path, true, false);
198         List<Message> messages =
199             _accessor.getChildValues(_propertyKey);
200 
201         messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
202 
203       }
204       else if (_changeType == EXTERNAL_VIEW)
205       {
206         ExternalViewChangeListener externalViewListener =
207             (ExternalViewChangeListener) _listener;
208         subscribeForChanges(changeContext, _path, true, true);
209         List<ExternalView> externalViewList =
210             _accessor.getChildValues(_propertyKey);
211 
212         externalViewListener.onExternalViewChange(externalViewList, changeContext);
213       }
214       else if (_changeType == ChangeType.CONTROLLER)
215       {
216         ControllerChangeListener controllerChangelistener =
217             (ControllerChangeListener) _listener;
218         subscribeForChanges(changeContext, _path, true, false);
219         controllerChangelistener.onControllerChange(changeContext);
220       }
221       else if (_changeType == ChangeType.HEALTH)
222       {
223         HealthStateChangeListener healthStateChangeListener =
224             (HealthStateChangeListener) _listener;
225         subscribeForChanges(changeContext, _path, true, true); // TODO: figure out
226         // settings here
227         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
228 
229         List<HealthStat> healthReportList =
230             _accessor.getChildValues(_propertyKey);
231 
232         healthStateChangeListener.onHealthChange(instanceName,
233                                                  healthReportList,
234                                                  changeContext);
235       }
236 
237       long end = System.currentTimeMillis();
238       if (logger.isInfoEnabled())
239       {
240         logger.info(Thread.currentThread().getId() + " END:INVOKE " + _path
241             + " listener:" + _listener.getClass().getCanonicalName() + " Took: "
242             + (end - start) +"ms");
243       }
244     }
245   }
246 
247   private void subscribeChildChange(String path, NotificationContext context)
248   {
249 	  NotificationContext.Type type = context.getType();
250       if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
251       {
252         logger.info(_manager.getInstanceName() + " subscribes child-change. path: " 
253         		+ path + ", listener: " + _listener);
254         _zkClient.subscribeChildChanges(path, this);
255       }
256       else if (type == NotificationContext.Type.FINALIZE)
257       {
258         logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: " 
259         		+ path + ", listener: " + _listener);
260         
261         _zkClient.unsubscribeChildChanges(path, this);
262       }
263   }
264   
265   private void subscribeDataChange(String path, NotificationContext context)
266   {
267     	NotificationContext.Type type = context.getType();
268         if (type == NotificationContext.Type.INIT
269             || type == NotificationContext.Type.CALLBACK)
270         {
271           if (logger.isDebugEnabled())
272           {
273             logger.debug(_manager.getInstanceName() + " subscribe data-change. path: " 
274             		+ path + ", listener: " + _listener);
275           }
276           _zkClient.subscribeDataChanges(path, this);
277 
278         }
279         else if (type == NotificationContext.Type.FINALIZE)
280         {
281           logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " 
282         		  + path + ", listener: " + _listener);
283 
284           _zkClient.unsubscribeDataChanges(path, this);
285         }
286   }
287   
288   // TODO watchParent is always true. consider remove it
289   private void subscribeForChanges(NotificationContext context,
290                                    String path,
291                                    boolean watchParent,
292                                    boolean watchChild)
293   {
294     if (watchParent)
295     {
296     	subscribeChildChange(path, context);
297     }
298 
299     if (watchChild)
300     {
301       try
302       {
303     	switch(_changeType)
304     	{
305         case CURRENT_STATE:
306         case IDEAL_STATE:
307         case EXTERNAL_VIEW:
308         {
309             // check if bucketized
310         	BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
311         	List<ZNRecord> records = baseAccessor.getChildren(path, null, 0);
312         	for (ZNRecord record : records)
313         	{
314                 HelixProperty property = new HelixProperty(record);
315             	String childPath = path + "/" + record.getId();
316 
317                 int bucketSize = property.getBucketSize();
318                 if (bucketSize > 0)
319                 {
320                   // subscribe both data-change and child-change on bucketized parent node
321                   // data-change gives a delete-callback which is used to remove watch
322                   subscribeChildChange(childPath, context);
323                   subscribeDataChange(childPath, context);
324                   
325                   // subscribe data-change on bucketized child
326                   List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
327                   if (bucketizedChildNames != null) 
328                   {
329                     for (String bucketizedChildName : bucketizedChildNames)
330                     {
331                        String bucketizedChildPath = childPath + "/" + bucketizedChildName;
332                        subscribeDataChange(bucketizedChildPath, context);
333                     }  
334                   }
335                 } else
336                 {
337                     subscribeDataChange(childPath, context);
338                 }
339         	}
340         	break;
341         }
342         default:
343         {
344             List<String> childNames = _zkClient.getChildren(path);
345             if (childNames != null) 
346             {
347               for (String childName : childNames)
348               {
349                  String childPath = path + "/" + childName;
350                  subscribeDataChange(childPath, context);
351               }  
352             }
353         	break;
354         }
355     	}
356       }
357       catch (ZkNoNodeException e)
358       {
359         logger.warn("fail to subscribe child/data change. path: " + path 
360         		+ ", listener: " + _listener, e);
361       }
362     }
363 
364   }
365 
366   public EventType[] getEventTypes()
367   {
368     return _eventTypes;
369   }
370 
371   /**
372    * Invoke the listener so that it sets up the initial values from the zookeeper if any
373    * exists
374    * 
375    */
376   public void init()
377   {
378     updateNotificationTime(System.nanoTime());
379     try
380     {
381       NotificationContext changeContext = new NotificationContext(_manager);
382       changeContext.setType(NotificationContext.Type.INIT);
383       invoke(changeContext);
384     }
385     catch (Exception e)
386     {
387       String msg = "Exception while invoking init callback for listener:"+ _listener;
388       ZKExceptionHandler.getInstance().handle(msg, e);
389     }
390   }
391 
392   @Override
393   public void handleDataChange(String dataPath, Object data)
394   {
395     try
396     {
397       updateNotificationTime(System.nanoTime());
398       if (dataPath != null && dataPath.startsWith(_path))
399       {
400         NotificationContext changeContext = new NotificationContext(_manager);
401         changeContext.setType(NotificationContext.Type.CALLBACK);
402         invoke(changeContext);
403       }
404     }
405     catch (Exception e)
406     {
407       String msg = "exception in handling data-change. path: " + dataPath 
408     		  + ", listener: " + _listener;
409       ZKExceptionHandler.getInstance().handle(msg, e);
410     }
411   }
412 
413   @Override
414   public void handleDataDeleted(String dataPath)
415   {
416     try
417     {
418       updateNotificationTime(System.nanoTime());
419       if (dataPath != null && dataPath.startsWith(_path))
420       {
421           logger.info(_manager.getInstanceName() + " unsubscribe data-change. path: " 
422         		  + dataPath + ", listener: " + _listener);
423           _zkClient.unsubscribeDataChanges(dataPath, this);
424 
425           // only needed for bucketized parent, but OK if we don't have child-change 
426           //  watch on the bucketized parent path
427           logger.info(_manager.getInstanceName() + " unsubscribe child-change. path: "
428         		  + dataPath + ", listener: " + _listener);
429           _zkClient.unsubscribeChildChanges(dataPath, this);
430           // No need to invoke() since this event will handled by child-change on parent-node
431 //        NotificationContext changeContext = new NotificationContext(_manager);
432 //        changeContext.setType(NotificationContext.Type.CALLBACK);
433 // 		  invoke(changeContext);
434       }
435     }
436     catch (Exception e)
437     {
438       String msg = "exception in handling data-delete-change. path: " + dataPath 
439           + ", listener: " + _listener;
440       ZKExceptionHandler.getInstance().handle(msg, e);
441     }
442   }
443 
444   @Override
445   public void handleChildChange(String parentPath, List<String> currentChilds)
446   {
447     try
448     {
449       updateNotificationTime(System.nanoTime());
450       if (parentPath != null && parentPath.startsWith(_path))
451       {
452         NotificationContext changeContext = new NotificationContext(_manager);
453         
454         if (currentChilds == null) {
455           // parentPath has been removed
456           if (parentPath.equals(_path)) {
457             // _path has been removed, remove this listener
458             _manager.removeListener(_propertyKey, _listener);
459           }
460           changeContext.setType(NotificationContext.Type.FINALIZE);
461         } else {
462           changeContext.setType(NotificationContext.Type.CALLBACK);
463         }
464         invoke(changeContext);
465       }
466     }
467     catch (Exception e)
468     {
469       String msg = "exception in handling child-change. instance: " + _manager.getInstanceName() 
470     		  + ", parentPath: " + parentPath + ", listener: " + _listener;
471       ZKExceptionHandler.getInstance().handle(msg, e);
472     }
473   }
474 
475   /**
476    * Invoke the listener for the last time so that the listener could clean up resources
477    * 
478    */
479   public void reset()
480   {
481     try
482     {
483       NotificationContext changeContext = new NotificationContext(_manager);
484       changeContext.setType(NotificationContext.Type.FINALIZE);
485       invoke(changeContext);
486     }
487     catch (Exception e)
488     {
489       String msg = "Exception while resetting the listener:"+_listener;
490       ZKExceptionHandler.getInstance().handle(msg, e);
491     }
492   }
493 
494   private void updateNotificationTime(long nanoTime)
495   {
496     long l = _lastNotificationTimeStamp.get();
497     while (nanoTime > l)
498     {
499       boolean b = _lastNotificationTimeStamp.compareAndSet(l, nanoTime);
500       if (b)
501       {
502         break;
503       }
504       else
505       {
506         l = _lastNotificationTimeStamp.get();
507       }
508     }
509   }
510 
511 }