1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.management.ManagementFactory;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Timer;
30 import java.util.concurrent.TimeUnit;
31
32 import org.I0Itec.zkclient.ZkConnection;
33 import org.apache.helix.BaseDataAccessor;
34 import org.apache.helix.ClusterMessagingService;
35 import org.apache.helix.ConfigAccessor;
36 import org.apache.helix.ConfigChangeListener;
37 import org.apache.helix.AccessOption;
38 import org.apache.helix.ControllerChangeListener;
39 import org.apache.helix.CurrentStateChangeListener;
40 import org.apache.helix.ExternalViewChangeListener;
41 import org.apache.helix.HealthStateChangeListener;
42 import org.apache.helix.HelixAdmin;
43 import org.apache.helix.HelixConstants.ChangeType;
44 import org.apache.helix.HelixDataAccessor;
45 import org.apache.helix.HelixException;
46 import org.apache.helix.HelixManager;
47 import org.apache.helix.HelixTimerTask;
48 import org.apache.helix.IdealStateChangeListener;
49 import org.apache.helix.InstanceConfigChangeListener;
50 import org.apache.helix.InstanceType;
51 import org.apache.helix.LiveInstanceChangeListener;
52 import org.apache.helix.LiveInstanceInfoProvider;
53 import org.apache.helix.MessageListener;
54 import org.apache.helix.PreConnectCallback;
55 import org.apache.helix.HelixManagerProperties;
56 import org.apache.helix.PropertyKey;
57 import org.apache.helix.PropertyKey.Builder;
58 import org.apache.helix.PropertyPathConfig;
59 import org.apache.helix.PropertyType;
60 import org.apache.helix.ScopedConfigChangeListener;
61 import org.apache.helix.ZNRecord;
62 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
63 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
64 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
65 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
66 import org.apache.helix.messaging.DefaultMessagingService;
67 import org.apache.helix.messaging.handling.MessageHandlerFactory;
68 import org.apache.helix.model.ConfigScope;
69 import org.apache.helix.model.CurrentState;
70 import org.apache.helix.model.HelixConfigScope;
71 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
72 import org.apache.helix.model.InstanceConfig;
73 import org.apache.helix.model.LiveInstance;
74 import org.apache.helix.model.Message.MessageType;
75 import org.apache.helix.model.builder.ConfigScopeBuilder;
76 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
77 import org.apache.helix.model.StateModelDefinition;
78 import org.apache.helix.monitoring.ZKPathDataDumpTask;
79 import org.apache.helix.participant.DistClusterControllerElection;
80 import org.apache.helix.participant.HelixStateMachineEngine;
81 import org.apache.helix.participant.StateMachineEngine;
82 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
83 import org.apache.helix.store.zk.ZkHelixPropertyStore;
84 import org.apache.log4j.Logger;
85 import org.apache.zookeeper.Watcher.Event.EventType;
86 import org.apache.zookeeper.Watcher.Event.KeeperState;
87
88
89 public class ZKHelixManager implements HelixManager
90 {
91 private static Logger logger =
92 Logger.getLogger(ZKHelixManager.class);
93 private static final int RETRY_LIMIT = 3;
94 private static final int CONNECTIONTIMEOUT = 60 * 1000;
95 private final String _clusterName;
96 private final String _instanceName;
97 private final String _zkConnectString;
98 private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
99 private ZKHelixDataAccessor _helixAccessor;
100 private ConfigAccessor _configAccessor;
101 protected ZkClient _zkClient;
102 protected final List<CallbackHandler> _handlers = new ArrayList<CallbackHandler>();
103 private final ZkStateChangeListener _zkStateChangeListener;
104 private final InstanceType _instanceType;
105 volatile String _sessionId;
106 private Timer _timer;
107 private CallbackHandler _leaderElectionHandler;
108 private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector;
109 private final DefaultMessagingService _messagingService;
110 private ZKHelixAdmin _managementTool;
111 private final String _version;
112 private final HelixManagerProperties _properties;
113 private final StateMachineEngine _stateMachEngine;
114 private int _sessionTimeout;
115 private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
116 private final List<HelixTimerTask> _controllerTimerTasks;
117 private BaseDataAccessor<ZNRecord> _baseDataAccessor;
118 List<PreConnectCallback> _preConnectCallbacks =
119 new LinkedList<PreConnectCallback>();
120 ZKPropertyTransferServer _transferServer = null;
121 int _flappingTimeWindowMs;
122 int _maxDisconnectThreshold;
123 public static final int FLAPPING_TIME_WINDIOW = 300000;
124 public static final int MAX_DISCONNECT_THRESHOLD = 5;
125 LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
126 public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
127
128 public ZKHelixManager(String clusterName,
129 String instanceName,
130 InstanceType instanceType,
131 String zkConnectString)
132 {
133 logger.info("Create a zk-based cluster manager. clusterName:" + clusterName
134 + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:"
135 + zkConnectString);
136 _flappingTimeWindowMs = FLAPPING_TIME_WINDIOW;
137 try
138 {
139 _flappingTimeWindowMs =
140 Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", ""
141 + FLAPPING_TIME_WINDIOW));
142 }
143 catch (NumberFormatException e)
144 {
145 logger.warn("Exception while parsing helixmanager.flappingTimeWindow: "
146 + System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW));
147 }
148 _maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD;
149 try
150 {
151 _maxDisconnectThreshold =
152 Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", ""
153 + MAX_DISCONNECT_THRESHOLD));
154 }
155 catch (NumberFormatException e)
156 {
157 logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: "
158 + System.getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD));
159 }
160 int sessionTimeoutInt = -1;
161 try
162 {
163 sessionTimeoutInt =
164 Integer.parseInt(System.getProperty("zk.session.timeout", ""
165 + DEFAULT_SESSION_TIMEOUT));
166 }
167 catch (NumberFormatException e)
168 {
169 logger.warn("Exception while parsing session timeout: "
170 + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT));
171 }
172 if (sessionTimeoutInt > 0)
173 {
174 _sessionTimeout = sessionTimeoutInt;
175 }
176 else
177 {
178 _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
179 }
180 if (instanceName == null)
181 {
182 try
183 {
184 instanceName =
185 InetAddress.getLocalHost().getCanonicalHostName() + "-"
186 + instanceType.toString();
187 }
188 catch (UnknownHostException e)
189 {
190
191 logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable",
192 e);
193 instanceName = "UNKNOWN";
194 }
195 }
196
197 _clusterName = clusterName;
198 _instanceName = instanceName;
199 _instanceType = instanceType;
200 _zkConnectString = zkConnectString;
201 _zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold);
202 _timer = null;
203
204 _messagingService = new DefaultMessagingService(this);
205
206 _properties =
207 new HelixManagerProperties("cluster-manager-version.properties");
208 _version = _properties.getVersion();
209
210 _stateMachEngine = new HelixStateMachineEngine(this);
211
212
213 _controllerTimerTasks = new ArrayList<HelixTimerTask>();
214 if (_instanceType == InstanceType.CONTROLLER)
215 {
216 _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
217 }
218 }
219
220 @Override
221 public boolean removeListener(PropertyKey key, Object listener)
222 {
223 logger.info("Removing listener: " + listener + " on path: " + key.getPath()
224 + " from cluster: " + _clusterName + " by instance: " + _instanceName);
225
226 synchronized (this)
227 {
228 List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
229 for (CallbackHandler handler : _handlers)
230 {
231
232 if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
233 {
234 toRemove.add(handler);
235 }
236 }
237
238 _handlers.removeAll(toRemove);
239
240
241 for (CallbackHandler handler : toRemove) {
242 handler.reset();
243 }
244 }
245
246 return true;
247 }
248
249 private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
250 {
251 checkConnected();
252
253 PropertyType type = propertyKey.getType();
254
255 synchronized (this)
256 {
257 for (CallbackHandler handler : _handlers)
258 {
259
260 if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
261 {
262 logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it");
263 return;
264 }
265 }
266
267 CallbackHandler newHandler =
268 createCallBackHandler(propertyKey, listener, eventType, changeType);
269 _handlers.add(newHandler);
270 logger.info("Add listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath());
271 }
272 }
273
274 @Override
275 public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception
276 {
277 addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
278 new EventType[] { EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated });
279 }
280
281 @Override
282 public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception
283 {
284 addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
285 new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
286 }
287
288 @Override
289 public void addConfigChangeListener(ConfigChangeListener listener)
290 {
291 addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
292 new EventType[] { EventType.NodeChildrenChanged });
293 }
294
295 @Override
296 public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
297 {
298 addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
299 new EventType[] { EventType.NodeChildrenChanged });
300 }
301
302 @Override
303 public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
304 {
305 Builder keyBuilder = new Builder(_clusterName);
306
307 PropertyKey propertyKey = null;
308 switch(scope)
309 {
310 case CLUSTER:
311 propertyKey = keyBuilder.clusterConfigs();
312 break;
313 case PARTICIPANT:
314 propertyKey = keyBuilder.instanceConfigs();
315 break;
316 case RESOURCE:
317 propertyKey = keyBuilder.resourceConfigs();
318 break;
319 default:
320 break;
321 }
322
323 if (propertyKey != null)
324 {
325 addListener(listener, propertyKey, ChangeType.CONFIG,
326 new EventType[] { EventType.NodeChildrenChanged });
327 } else
328 {
329 logger.error("Can't add listener to config scope: " + scope);
330 }
331 }
332
333
334
335 @Override
336 public void addMessageListener(MessageListener listener, String instanceName)
337 {
338 addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
339 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
340 }
341
342 void addControllerMessageListener(MessageListener listener)
343 {
344 addListener(listener, new Builder(_clusterName).controllerMessages(), ChangeType.MESSAGES_CONTROLLER,
345 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
346 }
347
348 @Override
349 public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
350 String instanceName,
351 String sessionId)
352 {
353 addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), ChangeType.CURRENT_STATE,
354 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
355 }
356
357 @Override
358 public void addHealthStateChangeListener(HealthStateChangeListener listener,
359 String instanceName)
360 {
361 addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
362 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
363 }
364
365 @Override
366 public void addExternalViewChangeListener(ExternalViewChangeListener listener)
367 {
368 addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
369 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
370 }
371
372 @Override
373 public void addControllerListener(ControllerChangeListener listener)
374 {
375 addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
376 new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
377 }
378
379 @Override
380 public HelixDataAccessor getHelixDataAccessor()
381 {
382 checkConnected();
383 return _helixAccessor;
384 }
385
386 @Override
387 public ConfigAccessor getConfigAccessor()
388 {
389 checkConnected();
390 return _configAccessor;
391 }
392
393 @Override
394 public String getClusterName()
395 {
396 return _clusterName;
397 }
398
399 @Override
400 public String getInstanceName()
401 {
402 return _instanceName;
403 }
404
405 @Override
406 public void connect() throws Exception
407 {
408 logger.info("ClusterManager.connect()");
409 if (_zkStateChangeListener.isConnected())
410 {
411 logger.warn("Cluster manager " + _clusterName + " " + _instanceName
412 + " already connected");
413 return;
414 }
415
416 try
417 {
418 createClient(_zkConnectString);
419 _messagingService.onConnected();
420 }
421 catch (Exception e)
422 {
423 logger.error(e);
424 disconnect();
425 throw e;
426 }
427 }
428
429 @Override
430 public void disconnect()
431 {
432 if (!isConnected())
433 {
434 logger.error("ClusterManager " + _instanceName + " already disconnected");
435 return;
436 }
437 disconnectInternal();
438 }
439
440 void disconnectInternal()
441 {
442
443
444 logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
445 + _clusterName);
446
447
448
449
450
451 _messagingService.getExecutor().shutdown();
452 resetHandlers();
453
454 _helixAccessor.shutdown();
455
456 if (_leaderElectionHandler != null)
457 {
458 _leaderElectionHandler.reset();
459 }
460
461 if (_participantHealthCheckInfoCollector != null)
462 {
463 _participantHealthCheckInfoCollector.stop();
464 }
465
466 if (_timer != null)
467 {
468 _timer.cancel();
469 _timer = null;
470 }
471
472 if (_instanceType == InstanceType.CONTROLLER)
473 {
474 stopTimerTasks();
475 }
476
477
478 _zkClient.unsubscribeAll();
479
480 _zkClient.close();
481
482
483 _zkStateChangeListener.disconnect();
484 logger.info("Cluster manager: " + _instanceName + " disconnected");
485
486 }
487
488 @Override
489 public String getSessionId()
490 {
491 checkConnected();
492 return _sessionId;
493 }
494
495 @Override
496 public boolean isConnected()
497 {
498 return _zkStateChangeListener.isConnected();
499 }
500
501 @Override
502 public long getLastNotificationTime()
503 {
504 return -1;
505 }
506
507 private void addLiveInstance()
508 {
509 LiveInstance liveInstance = new LiveInstance(_instanceName);
510 liveInstance.setSessionId(_sessionId);
511 liveInstance.setHelixVersion(_version);
512 liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
513
514 if(_liveInstanceInfoProvider != null)
515 {
516 logger.info("invoking _liveInstanceInfoProvider");
517 ZNRecord additionalLiveInstanceInfo = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
518 if(additionalLiveInstanceInfo != null)
519 {
520 additionalLiveInstanceInfo.merge(liveInstance.getRecord());
521 ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName);
522 liveInstance = new LiveInstance(mergedLiveInstance);
523 logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString());
524 }
525 }
526
527 logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:"
528 + _sessionId);
529 Builder keyBuilder = _helixAccessor.keyBuilder();
530 if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName),
531 liveInstance))
532 {
533 String errorMsg =
534 "Fail to create live instance node after waiting, so quit. instance:"
535 + _instanceName;
536 logger.warn(errorMsg);
537 throw new HelixException(errorMsg);
538
539 }
540 String currentStatePathParent =
541 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
542 _clusterName,
543 _instanceName,
544 getSessionId());
545
546 if (!_zkClient.exists(currentStatePathParent))
547 {
548 _zkClient.createPersistent(currentStatePathParent);
549 logger.info("Creating current state path " + currentStatePathParent);
550 }
551 }
552
553 private void startStatusUpdatedumpTask()
554 {
555 long initialDelay = 30 * 60 * 1000;
556 long period = 120 * 60 * 1000;
557 int timeThresholdNoChange = 180 * 60 * 1000;
558
559 if (_timer == null)
560 {
561 _timer = new Timer(true);
562 _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this,
563 _zkClient,
564 timeThresholdNoChange),
565 initialDelay,
566 period);
567 }
568 }
569
570 private void createClient(String zkServers) throws Exception
571 {
572 String propertyStorePath =
573 PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
574
575
576
577 PathBasedZkSerializer zkSerializer =
578 ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
579 .serialize(propertyStorePath, new ByteArraySerializer())
580 .build();
581
582 _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer);
583
584 ZkBaseDataAccessor<ZNRecord> baseDataAccessor =
585 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
586 if (_instanceType == InstanceType.PARTICIPANT)
587 {
588 String curStatePath =
589 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
590 _clusterName,
591 _instanceName);
592 _baseDataAccessor =
593 new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
594 Arrays.asList(curStatePath));
595 }
596 else if (_instanceType == InstanceType.CONTROLLER)
597 {
598 String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,
599
600 _clusterName);
601 _baseDataAccessor =
602 new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
603 Arrays.asList(extViewPath));
604
605 }
606 else
607 {
608 _baseDataAccessor = baseDataAccessor;
609 }
610
611 _helixAccessor =
612 new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
613 _configAccessor = new ConfigAccessor(_zkClient);
614 int retryCount = 0;
615
616 _zkClient.subscribeStateChanges(_zkStateChangeListener);
617 while (retryCount < RETRY_LIMIT)
618 {
619 try
620 {
621 _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
622 _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected);
623 _zkStateChangeListener.handleNewSession();
624 break;
625 }
626 catch (HelixException e)
627 {
628 logger.error("fail to createClient.", e);
629 throw e;
630 }
631 catch (Exception e)
632 {
633 retryCount++;
634
635 logger.error("fail to createClient. retry " + retryCount, e);
636 if (retryCount == RETRY_LIMIT)
637 {
638 throw e;
639 }
640 }
641 }
642 }
643
644 private CallbackHandler createCallBackHandler(PropertyKey propertyKey,
645 Object listener,
646 EventType[] eventTypes,
647 ChangeType changeType)
648 {
649 if (listener == null)
650 {
651 throw new HelixException("Listener cannot be null");
652 }
653 return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType);
654 }
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669 protected void handleNewSession()
670 {
671 boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
672 while (!isConnected)
673 {
674 logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: "
675 + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: "
676 + _clusterName);
677 isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS);
678 }
679
680 ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection());
681
682 synchronized (this)
683 {
684 _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
685 }
686 _baseDataAccessor.reset();
687
688
689
690 resetHandlers();
691
692 logger.info("Handling new session, session id:" + _sessionId + ", instance:"
693 + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
694
695 logger.info(zkConnection.getZookeeper());
696
697 if (!ZKUtil.isClusterSetup(_clusterName, _zkClient))
698 {
699 throw new HelixException("Initial cluster structure is not set up for cluster:"
700 + _clusterName);
701 }
702
703 boolean autoJoin = false;
704 try
705 {
706 HelixConfigScope scope =
707 new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName())
708 .build();
709 autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN));
710 logger.info("Auto joining " + _clusterName +" is true");
711 }
712 catch(Exception e)
713 {
714 }
715 if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType))
716 {
717 if(!autoJoin)
718 {
719 throw new HelixException("Initial cluster structure is not set up for instance:"
720 + _instanceName + " instanceType:" + _instanceType);
721 }
722 else
723 {
724 logger.info("Auto joining instance " + _instanceName);
725 InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
726 String hostName = _instanceName;
727 String port = "";
728 int lastPos = _instanceName.lastIndexOf("_");
729 if (lastPos > 0)
730 {
731 hostName = _instanceName.substring(0, lastPos);
732 port = _instanceName.substring(lastPos + 1);
733 }
734 instanceConfig.setHostName(hostName);
735 instanceConfig.setPort(port);
736 instanceConfig.setInstanceEnabled(true);
737 getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
738 }
739 }
740
741 if (_instanceType == InstanceType.PARTICIPANT
742 || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
743 {
744 handleNewSessionAsParticipant();
745 }
746
747 if (_instanceType == InstanceType.CONTROLLER
748 || _instanceType == InstanceType.CONTROLLER_PARTICIPANT)
749 {
750 addControllerMessageListener(_messagingService.getExecutor());
751 MessageHandlerFactory defaultControllerMsgHandlerFactory =
752 new DefaultControllerMessageHandlerFactory();
753 _messagingService.getExecutor()
754 .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
755 defaultControllerMsgHandlerFactory);
756 MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
757 new DefaultSchedulerMessageHandlerFactory(this);
758 _messagingService.getExecutor()
759 .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
760 defaultSchedulerMsgHandlerFactory);
761 MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
762 new DefaultParticipantErrorMessageHandlerFactory(this);
763 _messagingService.getExecutor()
764 .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
765 defaultParticipantErrorMessageHandlerFactory);
766
767 if (_leaderElectionHandler != null) {
768 _leaderElectionHandler.reset();
769 _leaderElectionHandler.init();
770 } else {
771 _leaderElectionHandler =
772 createCallBackHandler(new Builder(_clusterName).controller(),
773 new DistClusterControllerElection(_zkConnectString),
774 new EventType[] { EventType.NodeChildrenChanged,
775 EventType.NodeDeleted, EventType.NodeCreated },
776 ChangeType.CONTROLLER);
777 }
778 }
779
780 if (_instanceType == InstanceType.PARTICIPANT
781 || _instanceType == InstanceType.CONTROLLER_PARTICIPANT
782 || (_instanceType == InstanceType.CONTROLLER && isLeader()))
783 {
784 initHandlers();
785 }
786 }
787
788 private void handleNewSessionAsParticipant()
789 {
790
791 Builder keyBuilder = _helixAccessor.keyBuilder();
792
793 if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
794 {
795 logger.warn("Found another instance with same instanceName: " + _instanceName
796 + " in cluster " + _clusterName);
797
798
799
800 try
801 {
802 Thread.sleep(_sessionTimeout + 5000);
803 }
804 catch (InterruptedException e)
805 {
806 logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.",
807 e);
808 }
809
810 if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null)
811 {
812 String errorMessage =
813 "instance " + _instanceName + " already has a liveinstance in cluster "
814 + _clusterName;
815 logger.error(errorMessage);
816 throw new HelixException(errorMessage);
817 }
818 }
819
820 for (PreConnectCallback callback : _preConnectCallbacks)
821 {
822 callback.onPreConnect();
823 }
824 addLiveInstance();
825 carryOverPreviousCurrentState();
826
827
828
829 _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
830 _stateMachEngine);
831 addMessageListener(_messagingService.getExecutor(), _instanceName);
832 addControllerListener(_helixAccessor);
833
834 ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
835 _stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory);
836
837 if (_participantHealthCheckInfoCollector == null)
838 {
839 _participantHealthCheckInfoCollector =
840 new ParticipantHealthReportCollectorImpl(this, _instanceName);
841 _participantHealthCheckInfoCollector.start();
842 }
843
844
845 String healthCheckInfoPath =
846 _helixAccessor.keyBuilder().healthReports(_instanceName).getPath();
847 if (!_zkClient.exists(healthCheckInfoPath))
848 {
849 _zkClient.createPersistent(healthCheckInfoPath, true);
850 logger.info("Creating healthcheck info path " + healthCheckInfoPath);
851 }
852 }
853
854 @Override
855 public void addPreConnectCallback(PreConnectCallback callback)
856 {
857 logger.info("Adding preconnect callback");
858 _preConnectCallbacks.add(callback);
859 }
860
861 private void resetHandlers()
862 {
863 synchronized (this)
864 {
865 if (_handlers != null)
866 {
867
868
869 List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
870 tmpHandlers.addAll(_handlers);
871
872 for (CallbackHandler handler : tmpHandlers)
873 {
874 handler.reset();
875 logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
876 }
877 }
878 }
879 }
880
881 private void initHandlers()
882 {
883 synchronized (this)
884 {
885 if (_handlers != null)
886 {
887
888
889 List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
890 tmpHandlers.addAll(_handlers);
891 for (CallbackHandler handler : tmpHandlers)
892 {
893 handler.init();
894 logger.info("init handler: " + handler.getPath() + ", " + handler.getListener());
895 }
896 }
897 }
898 }
899
900 @Override
901 public boolean isLeader()
902 {
903 if (!isConnected())
904 {
905 return false;
906 }
907
908 if (_instanceType != InstanceType.CONTROLLER)
909 {
910 return false;
911 }
912
913 Builder keyBuilder = _helixAccessor.keyBuilder();
914 LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader());
915 if (leader == null)
916 {
917 return false;
918 }
919 else
920 {
921 String leaderName = leader.getInstanceName();
922
923
924
925
926
927
928 if (leaderName == null || !leaderName.equals(_instanceName))
929 {
930 return false;
931 }
932 }
933 return true;
934 }
935
936
937
938
939
940 private void carryOverPreviousCurrentState()
941 {
942 Builder keyBuilder = _helixAccessor.keyBuilder();
943 List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName));
944
945
946 for (String session : sessions) {
947 if (session.equals(_sessionId)) {
948 continue;
949 }
950
951 List<CurrentState> lastCurStates =
952 _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session));
953
954 for (CurrentState lastCurState : lastCurStates) {
955 logger.info("Carrying over old session: " + session + ", resource: "
956 + lastCurState.getId() + " to current session: " + _sessionId);
957 String stateModelDefRef = lastCurState.getStateModelDefRef();
958 if (stateModelDefRef == null)
959 {
960 logger.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
961 + lastCurState);
962 continue;
963 }
964 StateModelDefinition stateModel =
965 _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
966
967 String curStatePath = keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()).getPath();
968 _helixAccessor.getBaseDataAccessor().update(curStatePath,
969 new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT);
970 }
971 }
972
973
974 for (String session : sessions)
975 {
976 if (session.equals(_sessionId)) {
977 continue;
978 }
979
980 String path =
981 _helixAccessor.keyBuilder()
982 .currentStates(_instanceName, session)
983 .getPath();
984 logger.info("Removing current states from previous sessions. path: " + path);
985 _zkClient.deleteRecursive(path);
986 }
987 }
988
989 @Override
990 public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore()
991 {
992 checkConnected();
993
994 if (_helixPropertyStore == null)
995 {
996 String path =
997 PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName);
998
999 _helixPropertyStore =
1000 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient),
1001 path,
1002 null);
1003 }
1004
1005 return _helixPropertyStore;
1006 }
1007
1008 @Override
1009 public synchronized HelixAdmin getClusterManagmentTool()
1010 {
1011 checkConnected();
1012 if (_zkClient != null)
1013 {
1014 _managementTool = new ZKHelixAdmin(_zkClient);
1015 }
1016 else
1017 {
1018 logger.error("Couldn't get ZKClusterManagementTool because zkClient is null");
1019 }
1020
1021 return _managementTool;
1022 }
1023
1024 @Override
1025 public ClusterMessagingService getMessagingService()
1026 {
1027
1028
1029 return _messagingService;
1030 }
1031
1032 @Override
1033 public ParticipantHealthReportCollector getHealthReportCollector()
1034 {
1035 checkConnected();
1036 return _participantHealthCheckInfoCollector;
1037 }
1038
1039 @Override
1040 public InstanceType getInstanceType()
1041 {
1042 return _instanceType;
1043 }
1044
1045 private void checkConnected()
1046 {
1047 if (!isConnected())
1048 {
1049 throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
1050 }
1051 }
1052
1053 @Override
1054 public String getVersion()
1055 {
1056 return _version;
1057 }
1058
1059 @Override
1060 public HelixManagerProperties getProperties() {
1061 return _properties;
1062 }
1063
1064 @Override
1065 public StateMachineEngine getStateMachineEngine()
1066 {
1067 return _stateMachEngine;
1068 }
1069
1070
1071 @Override
1072 public void startTimerTasks()
1073 {
1074 for (HelixTimerTask task : _controllerTimerTasks)
1075 {
1076 task.start();
1077 }
1078 startStatusUpdatedumpTask();
1079 }
1080
1081 @Override
1082 public void stopTimerTasks()
1083 {
1084 for (HelixTimerTask task : _controllerTimerTasks)
1085 {
1086 task.stop();
1087 }
1088 }
1089
1090 @Override
1091 public void setLiveInstanceInfoProvider(
1092 LiveInstanceInfoProvider liveInstanceInfoProvider)
1093 {
1094 _liveInstanceInfoProvider = liveInstanceInfoProvider;
1095 }
1096 }