View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.management.ManagementFactory;
23  import java.net.InetAddress;
24  import java.net.UnknownHostException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Timer;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.I0Itec.zkclient.ZkConnection;
33  import org.apache.helix.BaseDataAccessor;
34  import org.apache.helix.ClusterMessagingService;
35  import org.apache.helix.ConfigAccessor;
36  import org.apache.helix.ConfigChangeListener;
37  import org.apache.helix.AccessOption;
38  import org.apache.helix.ControllerChangeListener;
39  import org.apache.helix.CurrentStateChangeListener;
40  import org.apache.helix.ExternalViewChangeListener;
41  import org.apache.helix.HealthStateChangeListener;
42  import org.apache.helix.HelixAdmin;
43  import org.apache.helix.HelixConstants.ChangeType;
44  import org.apache.helix.HelixDataAccessor;
45  import org.apache.helix.HelixException;
46  import org.apache.helix.HelixManager;
47  import org.apache.helix.HelixTimerTask;
48  import org.apache.helix.IdealStateChangeListener;
49  import org.apache.helix.InstanceConfigChangeListener;
50  import org.apache.helix.InstanceType;
51  import org.apache.helix.LiveInstanceChangeListener;
52  import org.apache.helix.LiveInstanceInfoProvider;
53  import org.apache.helix.MessageListener;
54  import org.apache.helix.PreConnectCallback;
55  import org.apache.helix.HelixManagerProperties;
56  import org.apache.helix.PropertyKey;
57  import org.apache.helix.PropertyKey.Builder;
58  import org.apache.helix.PropertyPathConfig;
59  import org.apache.helix.PropertyType;
60  import org.apache.helix.ScopedConfigChangeListener;
61  import org.apache.helix.ZNRecord;
62  import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
63  import org.apache.helix.healthcheck.HealthStatsAggregationTask;
64  import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
65  import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
66  import org.apache.helix.messaging.DefaultMessagingService;
67  import org.apache.helix.messaging.handling.MessageHandlerFactory;
68  import org.apache.helix.model.ConfigScope;
69  import org.apache.helix.model.CurrentState;
70  import org.apache.helix.model.HelixConfigScope;
71  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
72  import org.apache.helix.model.InstanceConfig;
73  import org.apache.helix.model.LiveInstance;
74  import org.apache.helix.model.Message.MessageType;
75  import org.apache.helix.model.builder.ConfigScopeBuilder;
76  import org.apache.helix.model.builder.HelixConfigScopeBuilder;
77  import org.apache.helix.model.StateModelDefinition;
78  import org.apache.helix.monitoring.ZKPathDataDumpTask;
79  import org.apache.helix.participant.DistClusterControllerElection;
80  import org.apache.helix.participant.HelixStateMachineEngine;
81  import org.apache.helix.participant.StateMachineEngine;
82  import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
83  import org.apache.helix.store.zk.ZkHelixPropertyStore;
84  import org.apache.log4j.Logger;
85  import org.apache.zookeeper.Watcher.Event.EventType;
86  import org.apache.zookeeper.Watcher.Event.KeeperState;
87  
88  
89  public class ZKHelixManager implements HelixManager
90  {
91    private static Logger                        logger                  =
92                                                                             Logger.getLogger(ZKHelixManager.class);
93    private static final int                     RETRY_LIMIT             = 3;
94    private static final int                     CONNECTIONTIMEOUT       = 60 * 1000;
95    private final String                         _clusterName;
96    private final String                         _instanceName;
97    private final String                         _zkConnectString;
98    private static final int                     DEFAULT_SESSION_TIMEOUT = 30 * 1000;
99    private ZKHelixDataAccessor                  _helixAccessor;
100   private ConfigAccessor                       _configAccessor;
101   protected ZkClient                           _zkClient;
102   protected final List<CallbackHandler>        _handlers = new ArrayList<CallbackHandler>();
103   private final ZkStateChangeListener          _zkStateChangeListener;
104   private final InstanceType                   _instanceType;
105   volatile String                              _sessionId;
106   private Timer                                _timer;
107   private CallbackHandler                      _leaderElectionHandler;
108   private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
109   private final DefaultMessagingService        _messagingService;
110   private ZKHelixAdmin                         _managementTool;
111   private final String                         _version;
112   private final HelixManagerProperties         _properties;
113   private final StateMachineEngine             _stateMachEngine;
114   private int                                  _sessionTimeout;
115   private ZkHelixPropertyStore<ZNRecord>       _helixPropertyStore;
116   private final List<HelixTimerTask>           _controllerTimerTasks;
117   private BaseDataAccessor<ZNRecord>           _baseDataAccessor;
118   List<PreConnectCallback>                     _preConnectCallbacks    =
119                                                                            new LinkedList<PreConnectCallback>();
120   ZKPropertyTransferServer                     _transferServer         = null;
121   int                                          _flappingTimeWindowMs; 
122   int                                          _maxDisconnectThreshold;
123   public static final int                     FLAPPING_TIME_WINDIOW   = 300000; // Default to 300 sec
124   public static final int                     MAX_DISCONNECT_THRESHOLD = 5;
125   LiveInstanceInfoProvider                    _liveInstanceInfoProvider = null;
126   public static final String                  ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
127 
128   public ZKHelixManager(String clusterName,
129                         String instanceName,
130                         InstanceType instanceType,
131                         String zkConnectString)
132   {
133     logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
134         + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
135         + zkConnectString);
136     _flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
137     try
138     {
139       _flappingTimeWindowMs =
140           Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", ""
141               + FLAPPING_TIME_WINDIOW));
142     }
143     catch (NumberFormatException e)
144     {
145       logger.warn("Exception while parsing helixmanager.flappingTimeWindow: "
146           + System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW));
147     }
148     _maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD;
149     try
150     {
151       _maxDisconnectThreshold =
152           Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", ""
153               + MAX_DISCONNECT_THRESHOLD));
154     }
155     catch (NumberFormatException e)
156     {
157       logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: "
158           + System.getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD));
159     }
160     int sessionTimeoutInt = -1;
161     try
162     {
163       sessionTimeoutInt =
164           Integer.parseInt(System.getProperty("zk.session.timeout", ""
165               + DEFAULT_SESSION_TIMEOUT));
166     }
167     catch (NumberFormatException e)
168     {
169       logger.warn("Exception while parsing session timeout: "
170           + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
171     }
172     if (sessionTimeoutInt > 0)
173     {
174       _sessionTimeout = sessionTimeoutInt;
175     }
176     else
177     {
178       _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
179     }
180     if (instanceName == null)
181     {
182       try
183       {
184         instanceName =
185             InetAddress.getLocalHost().getCanonicalHostName() + "-"
186                 + instanceType.toString();
187       }
188       catch (UnknownHostException e)
189       {
190         // can ignore it
191         logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
192                     e);
193         instanceName = "UNKNOWN";
194       }
195     }
196 
197     _clusterName = clusterName;
198     _instanceName = instanceName;
199     _instanceType = instanceType;
200     _zkConnectString = zkConnectString;
201     _zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
202     _timer = null;
203 
204     _messagingService = new DefaultMessagingService(this);
205 
206     _properties = 
207         new HelixManagerProperties("cluster-manager-version.properties");
208     _version = _properties.getVersion();
209         
210     _stateMachEngine = new HelixStateMachineEngine(this);
211 
212     // add all timer tasks
213     _controllerTimerTasks = new ArrayList<HelixTimerTask>();
214     if (_instanceType == InstanceType.CONTROLLER)
215     {
216       _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
217     }
218   }
219 
220   @Override
221   public boolean removeListener(PropertyKey key, Object listener)
222   {
223     logger.info("Removing listener: " + listener + " on path: " + key.getPath() 
224     		+ " from cluster: " + _clusterName + " by instance: " + _instanceName);
225 
226     synchronized (this)
227     {
228       List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
229       for (CallbackHandler handler : _handlers)
230       {
231         // compare property-key path and listener reference
232         if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
233         {
234           toRemove.add(handler);
235         }
236       }
237       
238       _handlers.removeAll(toRemove);
239       
240       // handler.reset() may modify the handlers list, so do it outside the iteration
241       for (CallbackHandler handler : toRemove) {
242     	  handler.reset();
243       }
244     }
245 
246     return true;
247   }
248 
249   private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
250   {
251     checkConnected();
252 
253     PropertyType type = propertyKey.getType();
254 
255     synchronized (this)
256     {
257       for (CallbackHandler handler : _handlers)
258       {
259         // compare property-key path and listener reference
260         if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
261         {
262           logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
263           return;
264         }
265       }
266       
267       CallbackHandler newHandler =
268           createCallBackHandler(propertyKey, listener, eventType, changeType);
269       _handlers.add(newHandler);
270       logger.info("Add listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath());
271     }
272   }
273   
274   @Override
275   public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
276   {
277 	  addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, 
278 	    new EventType[] { EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated });
279   }
280 
281   @Override
282   public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
283   {
284 	  addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, 
285 	    new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
286   }
287 
288   @Override
289   public void addConfigChangeListener(ConfigChangeListener listener)
290   {
291 	 addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, 
292 	   new EventType[] { EventType.NodeChildrenChanged });
293   }
294 
295   @Override
296   public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
297   {
298 	 addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, 
299 			 new EventType[] { EventType.NodeChildrenChanged });
300   }
301 
302   @Override
303   public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
304   {
305 	Builder keyBuilder = new Builder(_clusterName);
306 
307 	PropertyKey propertyKey = null;
308 	switch(scope)
309 	{
310 	case CLUSTER:
311 		propertyKey = keyBuilder.clusterConfigs();
312 		break;
313 	case PARTICIPANT:
314 		propertyKey = keyBuilder.instanceConfigs();
315 		break;
316 	case RESOURCE:
317 		propertyKey = keyBuilder.resourceConfigs();
318 		break;
319 	default:
320 		break;
321 	}
322 
323 	if (propertyKey != null)
324 	{
325 		addListener(listener, propertyKey, ChangeType.CONFIG, 
326 				new EventType[] { EventType.NodeChildrenChanged });
327 	} else
328 	{
329 		logger.error("Can't add listener to config scope: " + scope);
330 	}
331   }
332   
333   // TODO: Decide if do we still need this since we are exposing
334   // ClusterMessagingService
335   @Override
336   public void addMessageListener(MessageListener listener, String instanceName)
337   {
338 	 addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, 
339 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
340   }
341 
342   void addControllerMessageListener(MessageListener listener)
343   {
344 	 addListener(listener, new Builder(_clusterName).controllerMessages(), ChangeType.MESSAGES_CONTROLLER,
345 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
346   }
347 
348   @Override
349   public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
350                                             String instanceName,
351                                             String sessionId)
352   {
353 	 addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), ChangeType.CURRENT_STATE,
354 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
355   }
356 
357   @Override
358   public void addHealthStateChangeListener(HealthStateChangeListener listener,
359                                            String instanceName)
360   {
361 	  addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
362 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
363   }
364 
365   @Override
366   public void addExternalViewChangeListener(ExternalViewChangeListener listener)
367   {
368 	  addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
369 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
370   }
371 
372   @Override
373   public void addControllerListener(ControllerChangeListener listener)
374   {
375 	  addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
376 	    		new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
377   }
378 
379   @Override
380   public HelixDataAccessor getHelixDataAccessor()
381   {
382     checkConnected();
383     return _helixAccessor;
384   }
385 
386   @Override
387   public ConfigAccessor getConfigAccessor()
388   {
389     checkConnected();
390     return _configAccessor;
391   }
392 
393   @Override
394   public String getClusterName()
395   {
396     return _clusterName;
397   }
398 
399   @Override
400   public String getInstanceName()
401   {
402     return _instanceName;
403   }
404 
405   @Override
406   public void connect() throws Exception
407   {
408     logger.info("ClusterManager.connect()");
409     if (_zkStateChangeListener.isConnected())
410     {
411       logger.warn("Cluster manager " + _clusterName + " " + _instanceName
412           + " already connected");
413       return;
414     }
415 
416     try
417     {
418       createClient(_zkConnectString);
419       _messagingService.onConnected();
420     }
421     catch (Exception e)
422     {
423       logger.error(e);
424       disconnect();
425       throw e;
426     }
427   }
428 
429   @Override
430   public void disconnect()
431   {
432     if (!isConnected())
433     {
434       logger.error("ClusterManager " + _instanceName + " already disconnected");
435       return;
436     }
437     disconnectInternal();
438   }
439   
440   void disconnectInternal()
441   {
442     // This function can be called when the connection are in bad state(e.g. flapping), 
443     // in which isConnected() could be false and we want to disconnect from cluster.
444     logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
445         + _clusterName);
446 
447     /**
448      * shutdown thread pool first to avoid reset() being invoked in the middle of state
449      * transition
450      */
451     _messagingService.getExecutor().shutdown();
452     resetHandlers();
453 
454     _helixAccessor.shutdown();
455 
456     if (_leaderElectionHandler != null)
457     {
458       _leaderElectionHandler.reset();
459     }
460 
461     if (_participantHealthCheckInfoCollector != null)
462     {
463       _participantHealthCheckInfoCollector.stop();
464     }
465 
466     if (_timer != null)
467     {
468       _timer.cancel();
469       _timer = null;
470     }
471 
472     if (_instanceType == InstanceType.CONTROLLER)
473     {
474       stopTimerTasks();
475     }
476 
477     // unsubscribe accessor from controllerChange
478     _zkClient.unsubscribeAll();
479 
480     _zkClient.close();
481 
482     // HACK seems that zkClient is not sending DISCONNECT event
483     _zkStateChangeListener.disconnect();
484     logger.info("Cluster manager: " + _instanceName + " disconnected");
485   
486   }
487 
488   @Override
489   public String getSessionId()
490   {
491     checkConnected();
492     return _sessionId;
493   }
494 
495   @Override
496   public boolean isConnected()
497   {
498     return _zkStateChangeListener.isConnected();
499   }
500 
501   @Override
502   public long getLastNotificationTime()
503   {
504     return -1;
505   }
506 
507   private void addLiveInstance()
508   {
509     LiveInstance liveInstance = new LiveInstance(_instanceName);
510     liveInstance.setSessionId(_sessionId);
511     liveInstance.setHelixVersion(_version);
512     liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
513     
514     if(_liveInstanceInfoProvider != null)
515     {
516       logger.info("invoking _liveInstanceInfoProvider");
517       ZNRecord additionalLiveInstanceInfo = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
518       if(additionalLiveInstanceInfo != null)
519       {
520         additionalLiveInstanceInfo.merge(liveInstance.getRecord());
521         ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
522         liveInstance = new LiveInstance(mergedLiveInstance);
523         logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
524       }
525     }
526 
527     logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
528         + _sessionId);
529     Builder keyBuilder = _helixAccessor.keyBuilder();
530     if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
531                                        liveInstance))
532     {
533       String errorMsg =
534           "Fail to create live instance node after waiting, so quit. instance:"
535               + _instanceName;
536       logger.warn(errorMsg);
537       throw new HelixException(errorMsg);
538 
539     }
540     String currentStatePathParent =
541         PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
542                                    _clusterName,
543                                    _instanceName,
544                                    getSessionId());
545 
546     if (!_zkClient.exists(currentStatePathParent))
547     {
548       _zkClient.createPersistent(currentStatePathParent);
549       logger.info("Creating current state path " + currentStatePathParent);
550     }
551   }
552 
553   private void startStatusUpdatedumpTask()
554   {
555     long initialDelay = 30 * 60 * 1000;
556     long period = 120 * 60 * 1000;
557     int timeThresholdNoChange = 180 * 60 * 1000;
558 
559     if (_timer == null)
560     {
561       _timer = new Timer(true);
562       _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
563                                                         _zkClient,
564                                                         timeThresholdNoChange),
565                                  initialDelay,
566                                  period);
567     }
568   }
569 
570   private void createClient(String zkServers) throws Exception
571   {
572     String propertyStorePath =
573         PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
574 
575     // by default use ZNRecordStreamingSerializer except for paths within the property
576     // store which expects raw byte[] serialization/deserialization
577     PathBasedZkSerializer zkSerializer =
578         ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
579                                .serialize(propertyStorePath, new ByteArraySerializer())
580                                .build();
581 
582     _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
583 
584     ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
585         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
586     if (_instanceType == InstanceType.PARTICIPANT)
587     {
588       String curStatePath =
589           PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
590                                      _clusterName,
591                                      _instanceName);
592       _baseDataAccessor =
593           new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
594                                                 Arrays.asList(curStatePath));
595     }
596     else if (_instanceType == InstanceType.CONTROLLER)
597     {
598       String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
599 
600       _clusterName);
601       _baseDataAccessor =
602           new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
603                                                 Arrays.asList(extViewPath));
604 
605     }
606     else
607     {
608       _baseDataAccessor = baseDataAccessor;
609     }
610 
611     _helixAccessor =
612         new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
613     _configAccessor = new ConfigAccessor(_zkClient);
614     int retryCount = 0;
615 
616     _zkClient.subscribeStateChanges(_zkStateChangeListener);
617     while (retryCount < RETRY_LIMIT)
618     {
619       try
620       {
621         _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
622         _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
623         _zkStateChangeListener.handleNewSession();
624         break;
625       }
626       catch (HelixException e)
627       {
628         logger.error("fail to createClient.", e);
629         throw e;
630       }
631       catch (Exception e)
632       {
633         retryCount++;
634 
635         logger.error("fail to createClient. retry " + retryCount, e);
636         if (retryCount == RETRY_LIMIT)
637         {
638           throw e;
639         }
640       }
641     }
642   }
643 
644   private CallbackHandler createCallBackHandler(PropertyKey propertyKey,
645                                                 Object listener,
646                                                 EventType[] eventTypes,
647                                                 ChangeType changeType)
648   {
649     if (listener == null)
650     {
651       throw new HelixException("Listener cannot be null");
652     }
653     return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType);
654   }
655 
656   /**
657    * This will be invoked when ever a new session is created<br/>
658    * 
659    * case 1: the cluster manager was a participant carry over current state, add live
660    * instance, and invoke message listener; case 2: the cluster manager was controller and
661    * was a leader before do leader election, and if it becomes leader again, invoke ideal
662    * state listener, current state listener, etc. if it fails to become leader in the new
663    * session, then becomes standby; case 3: the cluster manager was controller and was NOT
664    * a leader before do leader election, and if it becomes leader, instantiate and invoke
665    * ideal state listener, current state listener, etc. if if fails to become leader in
666    * the new session, stay as standby
667    */
668 
669   protected void handleNewSession()
670   {
671     boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
672     while (!isConnected)
673     {
674       logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
675           + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
676           + _clusterName);
677       isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
678     }
679 
680     ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
681     
682     synchronized (this)
683     {
684       _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
685     }
686     _baseDataAccessor.reset();
687 
688     // reset all handlers so they have a chance to unsubscribe zk changes from zkclient
689     // abandon all callback-handlers added in expired session
690     resetHandlers();
691 
692     logger.info("Handling new session, session id:" + _sessionId + ", instance:"
693         + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
694 
695     logger.info(zkConnection.getZookeeper());
696 
697     if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
698     {
699       throw new HelixException("Initial cluster structure is not set up for cluster:"
700           + _clusterName);
701     }
702     // Read cluster config and see if instance can auto join the cluster
703     boolean autoJoin = false;
704     try
705     {
706       HelixConfigScope scope =
707           new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName())
708                                   .build();
709       autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN));
710       logger.info("Auto joining " + _clusterName +" is true");
711     }
712     catch(Exception e)
713     {
714     }
715     if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType))
716     {
717       if(!autoJoin)
718       {
719         throw new HelixException("Initial cluster structure is not set up for instance:"
720           + _instanceName + " instanceType:" + _instanceType);
721       }
722       else
723       {
724         logger.info("Auto joining instance " + _instanceName);
725         InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
726         String hostName = _instanceName;
727         String port = "";
728         int lastPos = _instanceName.lastIndexOf("_");
729         if (lastPos > 0)
730         {
731           hostName = _instanceName.substring(0, lastPos);
732           port = _instanceName.substring(lastPos + 1);
733         }
734         instanceConfig.setHostName(hostName);
735         instanceConfig.setPort(port);
736         instanceConfig.setInstanceEnabled(true);
737         getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
738       }
739     }
740 
741     if (_instanceType == InstanceType.PARTICIPANT
742         || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
743     {
744       handleNewSessionAsParticipant();
745     }
746 
747     if (_instanceType == InstanceType.CONTROLLER
748         || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
749     {
750       addControllerMessageListener(_messagingService.getExecutor());
751       MessageHandlerFactory defaultControllerMsgHandlerFactory =
752           new DefaultControllerMessageHandlerFactory();
753       _messagingService.getExecutor()
754                        .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
755                                                       defaultControllerMsgHandlerFactory);
756       MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
757           new DefaultSchedulerMessageHandlerFactory(this);
758       _messagingService.getExecutor()
759                        .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
760                                                       defaultSchedulerMsgHandlerFactory);
761       MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
762           new DefaultParticipantErrorMessageHandlerFactory(this);
763       _messagingService.getExecutor()
764                        .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
765                                                       defaultParticipantErrorMessageHandlerFactory);
766 
767       if (_leaderElectionHandler != null) {
768     	  _leaderElectionHandler.reset();
769     	  _leaderElectionHandler.init();
770       } else {
771         _leaderElectionHandler =
772               createCallBackHandler(new Builder(_clusterName).controller(),
773                                     new DistClusterControllerElection(_zkConnectString),
774                                     new EventType[] { EventType.NodeChildrenChanged,
775                                         EventType.NodeDeleted, EventType.NodeCreated },
776                                     ChangeType.CONTROLLER);
777       }
778     }
779 
780     if (_instanceType == InstanceType.PARTICIPANT
781         || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
782         || (_instanceType == InstanceType.CONTROLLER && isLeader()))
783     {
784       initHandlers();
785     }
786   }
787 
788   private void handleNewSessionAsParticipant()
789   {
790     // In case there is a live instance record on zookeeper
791     Builder keyBuilder = _helixAccessor.keyBuilder();
792 
793     if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
794     {
795       logger.warn("Found another instance with same instanceName: " + _instanceName
796           + " in cluster " + _clusterName);
797       // Wait for a while, in case previous storage node exits unexpectedly
798       // and its liveinstance
799       // still hangs around until session timeout happens
800       try
801       {
802         Thread.sleep(_sessionTimeout + 5000);
803       }
804       catch (InterruptedException e)
805       {
806         logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
807                     e);
808       }
809 
810       if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
811       {
812         String errorMessage =
813             "instance " + _instanceName + " already has a liveinstance in cluster "
814                 + _clusterName;
815         logger.error(errorMessage);
816         throw new HelixException(errorMessage);
817       }
818     }
819     // Invoke the PreConnectCallbacks
820     for (PreConnectCallback callback : _preConnectCallbacks)
821     {
822       callback.onPreConnect();
823     }
824     addLiveInstance();
825     carryOverPreviousCurrentState();
826 
827     // In case the cluster manager is running as a participant, setup message
828     // listener
829     _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
830                                                     _stateMachEngine);
831     addMessageListener(_messagingService.getExecutor(), _instanceName);
832     addControllerListener(_helixAccessor);
833     
834     ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
835     _stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
836 
837     if (_participantHealthCheckInfoCollector == null)
838     {
839       _participantHealthCheckInfoCollector =
840           new ParticipantHealthReportCollectorImpl(this, _instanceName);
841       _participantHealthCheckInfoCollector.start();
842     }
843     // start the participant health check timer, also create zk path for health
844     // check info
845     String healthCheckInfoPath =
846         _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
847     if (!_zkClient.exists(healthCheckInfoPath))
848     {
849       _zkClient.createPersistent(healthCheckInfoPath, true);
850       logger.info("Creating healthcheck info path " + healthCheckInfoPath);
851     }
852   }
853 
854   @Override
855   public void addPreConnectCallback(PreConnectCallback callback)
856   {
857     logger.info("Adding preconnect callback");
858     _preConnectCallbacks.add(callback);
859   }
860 
861   private void resetHandlers()
862   {
863     synchronized (this)
864     {
865     	if (_handlers != null)
866         {
867             // get a copy of the list and iterate over the copy list
868             // in case handler.reset() will modify the original handler list
869             List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
870             tmpHandlers.addAll(_handlers);
871 
872             for (CallbackHandler handler : tmpHandlers)
873             {
874               handler.reset();
875               logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
876             }
877         }
878     }
879   }
880 
881   private void initHandlers()
882   {
883     synchronized (this)
884     {
885     	if (_handlers != null)
886     	{
887     	  // may add new currentState and message listeners during init()
888     	  // so make a copy and iterate over the copy
889     	  List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
890     	  tmpHandlers.addAll(_handlers);
891           for (CallbackHandler handler : tmpHandlers)
892           {
893             handler.init();
894             logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
895           }
896     	}
897     }
898   }
899 
900   @Override
901   public boolean isLeader()
902   {
903     if (!isConnected())
904     {
905       return false;
906     }
907 
908     if (_instanceType != InstanceType.CONTROLLER)
909     {
910       return false;
911     }
912 
913     Builder keyBuilder = _helixAccessor.keyBuilder();
914     LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
915     if (leader == null)
916     {
917       return false;
918     }
919     else
920     {
921       String leaderName = leader.getInstanceName();
922       // TODO need check sessionId also, but in distributed mode, leader's
923       // sessionId is
924       // not equal to
925       // the leader znode's sessionId field which is the sessionId of the
926       // controller_participant that
927       // successfully creates the leader node
928       if (leaderName == null || !leaderName.equals(_instanceName))
929       {
930         return false;
931       }
932     }
933     return true;
934   }
935 
936   /**
937    * carry over current-states from last sessions
938    * set to initial state for current session only when the state doesn't exist in current session
939    */
940   private void carryOverPreviousCurrentState()
941   {
942     Builder keyBuilder = _helixAccessor.keyBuilder();
943     List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
944     
945     // carry-over
946     for (String session : sessions) {
947       if (session.equals(_sessionId)) {
948         continue;
949       }
950       
951       List<CurrentState> lastCurStates = 
952           _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session));
953 
954       for (CurrentState lastCurState : lastCurStates) {
955         logger.info("Carrying over old session: " + session + ", resource: "
956             + lastCurState.getId() + " to current session: " + _sessionId);
957         String stateModelDefRef = lastCurState.getStateModelDefRef();
958         if (stateModelDefRef == null)
959         {
960           logger.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
961               + lastCurState);
962           continue;
963         }
964         StateModelDefinition stateModel =
965             _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
966 
967         String curStatePath = keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()).getPath();
968         _helixAccessor.getBaseDataAccessor().update(curStatePath, 
969            new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT);
970       }
971     }
972     
973     // remove previous current states
974     for (String session : sessions)
975     {
976       if (session.equals(_sessionId)) {
977         continue;
978       }
979 
980       String path =
981           _helixAccessor.keyBuilder()
982                         .currentStates(_instanceName, session)
983                         .getPath();
984       logger.info("Removing current states from previous sessions. path: " + path);
985       _zkClient.deleteRecursive(path);
986     }
987   }
988 
989   @Override
990   public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
991   {
992     checkConnected();
993 
994     if (_helixPropertyStore == null)
995     {
996       String path =
997           PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
998 
999       _helixPropertyStore =
1000           new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
1001                                              path,
1002                                              null);
1003     }
1004 
1005     return _helixPropertyStore;
1006   }
1007 
1008   @Override
1009   public synchronized HelixAdmin getClusterManagmentTool()
1010   {
1011     checkConnected();
1012     if (_zkClient != null)
1013     {
1014       _managementTool = new ZKHelixAdmin(_zkClient);
1015     }
1016     else
1017     {
1018       logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
1019     }
1020 
1021     return _managementTool;
1022   }
1023 
1024   @Override
1025   public ClusterMessagingService getMessagingService()
1026   {
1027     // The caller can register message handler factories on messaging service before the
1028     // helix manager is connected. Thus we do not do connected check here.
1029     return _messagingService;
1030   }
1031 
1032   @Override
1033   public ParticipantHealthReportCollector getHealthReportCollector()
1034   {
1035     checkConnected();
1036     return _participantHealthCheckInfoCollector;
1037   }
1038 
1039   @Override
1040   public InstanceType getInstanceType()
1041   {
1042     return _instanceType;
1043   }
1044 
1045   private void checkConnected()
1046   {
1047     if (!isConnected())
1048     {
1049       throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
1050     }
1051   }
1052 
1053   @Override
1054   public String getVersion()
1055   {
1056     return _version;
1057   }
1058 
1059   @Override
1060   public HelixManagerProperties getProperties() {
1061     return _properties;
1062   }
1063   
1064   @Override
1065   public StateMachineEngine getStateMachineEngine()
1066   {
1067     return _stateMachEngine;
1068   }
1069 
1070   // TODO: rename this and not expose this function as part of interface
1071   @Override
1072   public void startTimerTasks()
1073   {
1074     for (HelixTimerTask task : _controllerTimerTasks)
1075     {
1076       task.start();
1077     }
1078     startStatusUpdatedumpTask();
1079   }
1080 
1081   @Override
1082   public void stopTimerTasks()
1083   {
1084     for (HelixTimerTask task : _controllerTimerTasks)
1085     {
1086       task.stop();
1087     }
1088   }
1089 
1090   @Override
1091   public void setLiveInstanceInfoProvider(
1092       LiveInstanceInfoProvider liveInstanceInfoProvider)
1093   {
1094     _liveInstanceInfoProvider = liveInstanceInfoProvider;
1095   }
1096 }