1 package org.apache.helix.controller;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.apache.helix.ConfigChangeListener;
32 import org.apache.helix.ControllerChangeListener;
33 import org.apache.helix.CurrentStateChangeListener;
34 import org.apache.helix.ExternalViewChangeListener;
35 import org.apache.helix.HealthStateChangeListener;
36 import org.apache.helix.HelixDataAccessor;
37 import org.apache.helix.HelixManager;
38 import org.apache.helix.IdealStateChangeListener;
39 import org.apache.helix.LiveInstanceChangeListener;
40 import org.apache.helix.MessageListener;
41 import org.apache.helix.NotificationContext;
42 import org.apache.helix.ZNRecord;
43 import org.apache.helix.NotificationContext.Type;
44 import org.apache.helix.PropertyKey.Builder;
45 import org.apache.helix.controller.pipeline.Pipeline;
46 import org.apache.helix.controller.pipeline.PipelineRegistry;
47 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
48 import org.apache.helix.controller.stages.ClusterEvent;
49 import org.apache.helix.controller.stages.CompatibilityCheckStage;
50 import org.apache.helix.controller.stages.CurrentStateComputationStage;
51 import org.apache.helix.controller.stages.ExternalViewComputeStage;
52 import org.apache.helix.controller.stages.MessageGenerationPhase;
53 import org.apache.helix.controller.stages.MessageSelectionStage;
54 import org.apache.helix.controller.stages.MessageThrottleStage;
55 import org.apache.helix.controller.stages.ReadClusterDataStage;
56 import org.apache.helix.controller.stages.RebalanceIdealStateStage;
57 import org.apache.helix.controller.stages.ResourceComputationStage;
58 import org.apache.helix.controller.stages.TaskAssignmentStage;
59 import org.apache.helix.model.CurrentState;
60 import org.apache.helix.model.ExternalView;
61 import org.apache.helix.model.HealthStat;
62 import org.apache.helix.model.IdealState;
63 import org.apache.helix.model.InstanceConfig;
64 import org.apache.helix.model.LiveInstance;
65 import org.apache.helix.model.Message;
66 import org.apache.helix.model.PauseSignal;
67 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
68 import org.apache.log4j.Logger;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public class GenericHelixController implements
88 ConfigChangeListener,
89 IdealStateChangeListener,
90 LiveInstanceChangeListener,
91 MessageListener,
92 CurrentStateChangeListener,
93 ExternalViewChangeListener,
94 ControllerChangeListener,
95 HealthStateChangeListener
96 {
97 private static final Logger logger =
98 Logger.getLogger(GenericHelixController.class.getName());
99 volatile boolean init = false;
100 private final PipelineRegistry _registry;
101
102 final AtomicReference<Map<String, LiveInstance>> _lastSeenInstances;
103 final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions;
104
105 ClusterStatusMonitor _clusterStatusMonitor;
106
107
108
109
110
111
112
113 private boolean _paused;
114
115
116
117
118
119 Timer _rebalanceTimer = null;
120 int _timerPeriod = Integer.MAX_VALUE;
121
122
123
124
125
126
127 public GenericHelixController()
128 {
129 this(createDefaultRegistry());
130 }
131
132 class RebalanceTask extends TimerTask
133 {
134 HelixManager _manager;
135
136 public RebalanceTask(HelixManager manager)
137 {
138 _manager = manager;
139 }
140
141 @Override
142 public void run()
143 {
144 NotificationContext changeContext = new NotificationContext(_manager);
145 changeContext.setType(NotificationContext.Type.CALLBACK);
146 ClusterEvent event = new ClusterEvent("periodicalRebalance");
147 event.addAttribute("helixmanager", changeContext.getManager());
148 event.addAttribute("changeContext", changeContext);
149 List<ZNRecord> dummy = new ArrayList<ZNRecord>();
150 event.addAttribute("eventData", dummy);
151
152 handleEvent(event);
153 }
154 }
155
156
157
158
159
160
161 void startRebalancingTimer(int period, HelixManager manager)
162 {
163 logger.info("Controller starting timer at period " + period);
164 if(period < _timerPeriod)
165 {
166 if(_rebalanceTimer != null)
167 {
168 _rebalanceTimer.cancel();
169 }
170 _rebalanceTimer = new Timer(true);
171 _timerPeriod = period;
172 _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
173 }
174 else
175 {
176 logger.info("Controller already has timer at period " + _timerPeriod);
177 }
178 }
179
180
181
182
183 void stopRebalancingTimer()
184 {
185 if(_rebalanceTimer != null)
186 {
187 _rebalanceTimer.cancel();
188 _rebalanceTimer = null;
189 }
190 _timerPeriod = Integer.MAX_VALUE;
191 }
192
193 private static PipelineRegistry createDefaultRegistry()
194 {
195 logger.info("createDefaultRegistry");
196 synchronized (GenericHelixController.class)
197 {
198 PipelineRegistry registry = new PipelineRegistry();
199
200
201 Pipeline dataRefresh = new Pipeline();
202 dataRefresh.addStage(new ReadClusterDataStage());
203
204
205 Pipeline rebalancePipeline = new Pipeline();
206 rebalancePipeline.addStage(new ResourceComputationStage());
207 rebalancePipeline.addStage(new CurrentStateComputationStage());
208 rebalancePipeline.addStage(new RebalanceIdealStateStage());
209 rebalancePipeline.addStage(new BestPossibleStateCalcStage());
210 rebalancePipeline.addStage(new MessageGenerationPhase());
211 rebalancePipeline.addStage(new MessageSelectionStage());
212 rebalancePipeline.addStage(new MessageThrottleStage());
213 rebalancePipeline.addStage(new TaskAssignmentStage());
214
215
216 Pipeline externalViewPipeline = new Pipeline();
217 externalViewPipeline.addStage(new ExternalViewComputeStage());
218
219
220 Pipeline liveInstancePipeline = new Pipeline();
221 liveInstancePipeline.addStage(new CompatibilityCheckStage());
222
223 registry.register("idealStateChange", dataRefresh, rebalancePipeline);
224 registry.register("currentStateChange",
225 dataRefresh,
226 rebalancePipeline,
227 externalViewPipeline);
228 registry.register("configChange", dataRefresh, rebalancePipeline);
229 registry.register("liveInstanceChange",
230 dataRefresh,
231 liveInstancePipeline,
232 rebalancePipeline,
233 externalViewPipeline);
234
235 registry.register("messageChange",
236 dataRefresh,
237 rebalancePipeline);
238 registry.register("externalView", dataRefresh);
239 registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
240 registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
241
242
243
244
245
246
247
248
249 return registry;
250 }
251 }
252
253 public GenericHelixController(PipelineRegistry registry)
254 {
255 _paused = false;
256 _registry = registry;
257 _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
258 _lastSeenSessions = new AtomicReference<Map<String,LiveInstance>>();
259 }
260
261
262
263
264
265
266
267 protected synchronized void handleEvent(ClusterEvent event)
268 {
269 HelixManager manager = event.getAttribute("helixmanager");
270 if (manager == null)
271 {
272 logger.error("No cluster manager in event:" + event.getName());
273 return;
274 }
275
276 if (!manager.isLeader())
277 {
278 logger.error("Cluster manager: " + manager.getInstanceName()
279 + " is not leader. Pipeline will not be invoked");
280 return;
281 }
282
283 if (_paused)
284 {
285 logger.info("Cluster is paused. Ignoring the event:" + event.getName());
286 return;
287 }
288
289 NotificationContext context = null;
290 if (event.getAttribute("changeContext") != null)
291 {
292 context = (NotificationContext) (event.getAttribute("changeContext"));
293 }
294
295
296 if (context != null)
297 {
298 if (context.getType() == Type.FINALIZE)
299 {
300 if (_clusterStatusMonitor != null)
301 {
302 _clusterStatusMonitor.reset();
303 _clusterStatusMonitor = null;
304 }
305
306 stopRebalancingTimer();
307 logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
308 return;
309 }
310 else
311 {
312 if (_clusterStatusMonitor == null)
313 {
314 _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
315 }
316
317 event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
318 }
319 }
320
321 List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
322 if (pipelines == null || pipelines.size() == 0)
323 {
324 logger.info("No pipeline to run for event:" + event.getName());
325 return;
326 }
327
328 for (Pipeline pipeline : pipelines)
329 {
330 try
331 {
332 pipeline.handle(event);
333 pipeline.finish();
334 }
335 catch (Exception e)
336 {
337 logger.error("Exception while executing pipeline: " + pipeline
338 + ". Will not continue to next pipeline", e);
339 break;
340 }
341 }
342 }
343
344
345
346
347 @Override
348 public void onExternalViewChange(List<ExternalView> externalViewList,
349 NotificationContext changeContext)
350 {
351
352
353
354
355
356
357
358 }
359
360 @Override
361 public void onStateChange(String instanceName,
362 List<CurrentState> statesInfo,
363 NotificationContext changeContext)
364 {
365 logger.info("START: GenericClusterController.onStateChange()");
366 ClusterEvent event = new ClusterEvent("currentStateChange");
367 event.addAttribute("helixmanager", changeContext.getManager());
368 event.addAttribute("instanceName", instanceName);
369 event.addAttribute("changeContext", changeContext);
370 event.addAttribute("eventData", statesInfo);
371 handleEvent(event);
372 logger.info("END: GenericClusterController.onStateChange()");
373 }
374
375 @Override
376 public void onHealthChange(String instanceName,
377 List<HealthStat> reports,
378 NotificationContext changeContext)
379 {
380
381
382
383
384
385 }
386
387 @Override
388 public void onMessage(String instanceName,
389 List<Message> messages,
390 NotificationContext changeContext)
391 {
392 logger.info("START: GenericClusterController.onMessage()");
393
394 ClusterEvent event = new ClusterEvent("messageChange");
395 event.addAttribute("helixmanager", changeContext.getManager());
396 event.addAttribute("instanceName", instanceName);
397 event.addAttribute("changeContext", changeContext);
398 event.addAttribute("eventData", messages);
399 handleEvent(event);
400
401 if (_clusterStatusMonitor != null && messages != null)
402 {
403 _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
404 }
405
406 logger.info("END: GenericClusterController.onMessage()");
407 }
408
409 @Override
410 public void onLiveInstanceChange(List<LiveInstance> liveInstances,
411 NotificationContext changeContext)
412 {
413 logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
414 if (liveInstances == null)
415 {
416 liveInstances = Collections.emptyList();
417 }
418
419
420 if (changeContext.getType() == NotificationContext.Type.INIT ||
421 changeContext.getType() == NotificationContext.Type.CALLBACK)
422 {
423 checkLiveInstancesObservation(liveInstances, changeContext);
424 }
425
426 ClusterEvent event = new ClusterEvent("liveInstanceChange");
427 event.addAttribute("helixmanager", changeContext.getManager());
428 event.addAttribute("changeContext", changeContext);
429 event.addAttribute("eventData", liveInstances);
430 handleEvent(event);
431 logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
432 }
433
434 void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
435 {
436 if (manager.getConfigAccessor() == null)
437 {
438 logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
439 return;
440 }
441
442 for(IdealState idealState : idealStates)
443 {
444 int period = idealState.getRebalanceTimerPeriod();
445 if(period > 0)
446 {
447 startRebalancingTimer(period, manager);
448 }
449 }
450 }
451
452 @Override
453 public void onIdealStateChange(List<IdealState> idealStates,
454 NotificationContext changeContext)
455 {
456 logger.info("START: Generic GenericClusterController.onIdealStateChange()");
457 ClusterEvent event = new ClusterEvent("idealStateChange");
458 event.addAttribute("helixmanager", changeContext.getManager());
459 event.addAttribute("changeContext", changeContext);
460 event.addAttribute("eventData", idealStates);
461 handleEvent(event);
462
463 if(changeContext.getType() != Type.FINALIZE)
464 {
465 checkRebalancingTimer(changeContext.getManager(), idealStates);
466 }
467
468 logger.info("END: Generic GenericClusterController.onIdealStateChange()");
469 }
470
471 @Override
472 public void onConfigChange(List<InstanceConfig> configs,
473 NotificationContext changeContext)
474 {
475 logger.info("START: GenericClusterController.onConfigChange()");
476 ClusterEvent event = new ClusterEvent("configChange");
477 event.addAttribute("changeContext", changeContext);
478 event.addAttribute("helixmanager", changeContext.getManager());
479 event.addAttribute("eventData", configs);
480 handleEvent(event);
481 logger.info("END: GenericClusterController.onConfigChange()");
482 }
483
484 @Override
485 public void onControllerChange(NotificationContext changeContext)
486 {
487 logger.info("START: GenericClusterController.onControllerChange()");
488 if (changeContext!= null && changeContext.getType() == Type.FINALIZE)
489 {
490 logger.info("GenericClusterController.onControllerChange() FINALIZE");
491 return;
492 }
493 HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
494
495
496 Builder keyBuilder = accessor.keyBuilder();
497 LiveInstance leader =
498 accessor.getProperty(keyBuilder.controllerLeader());
499 if (leader == null)
500 {
501 logger.warn("No controller exists for cluster:"
502 + changeContext.getManager().getClusterName());
503 return;
504 }
505 else
506 {
507 String leaderName = leader.getInstanceName();
508
509 String instanceName = changeContext.getManager().getInstanceName();
510 if (leaderName == null || !leaderName.equals(instanceName))
511 {
512 logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
513 + leader);
514 return;
515 }
516 }
517
518 PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
519 if (pauseSignal != null)
520 {
521 _paused = true;
522 logger.info("controller is now paused");
523 }
524 else
525 {
526 if (_paused)
527 {
528
529 logger.info("controller is now resumed");
530 _paused = false;
531 ClusterEvent event = new ClusterEvent("resume");
532 event.addAttribute("changeContext", changeContext);
533 event.addAttribute("helixmanager", changeContext.getManager());
534 event.addAttribute("eventData", pauseSignal);
535 handleEvent(event);
536 }
537 else
538 {
539 _paused = false;
540 }
541 }
542 logger.info("END: GenericClusterController.onControllerChange()");
543 }
544
545
546
547
548
549
550
551 protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
552 NotificationContext changeContext)
553 {
554
555
556 Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
557 Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
558 for(LiveInstance liveInstance : liveInstances) {
559 curInstances.put(liveInstance.getInstanceName(), liveInstance);
560 curSessions.put(liveInstance.getSessionId(), liveInstance);
561 }
562
563 Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
564 Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
565
566 HelixManager manager = changeContext.getManager();
567 Builder keyBuilder = new Builder(manager.getClusterName());
568 if (lastSessions != null) {
569 for (String session : lastSessions.keySet()) {
570 if (!curSessions.containsKey(session)) {
571
572 String instanceName = lastSessions.get(session).getInstanceName();
573 manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
574 }
575 }
576 }
577
578 if (lastInstances != null) {
579 for (String instance : lastInstances.keySet()) {
580 if (!curInstances.containsKey(instance)) {
581
582 manager.removeListener(keyBuilder.messages(instance), this);
583 }
584 }
585 }
586
587 for (String session : curSessions.keySet()) {
588 if (lastSessions == null || !lastSessions.containsKey(session)) {
589 String instanceName = curSessions.get(session).getInstanceName();
590 try {
591
592 manager.addCurrentStateChangeListener(this, instanceName, session);
593 logger.info("Succeed in addling current state listener for instance: " + instanceName + " with session: " + session);
594
595 } catch (Exception e) {
596 logger.error("Fail to add current state listener for instance: "
597 + instanceName + " with session: " + session, e);
598 }
599 }
600 }
601
602 for (String instance : curInstances.keySet()) {
603 if (lastInstances == null || !lastInstances.containsKey(instance)) {
604 try {
605
606 manager.addMessageListener(this, instance);
607 logger.info("Succeed in adding message listener for " + instance);
608 }
609 catch (Exception e)
610 {
611 logger.error("Fail to add message listener for instance: " + instance, e);
612 }
613 }
614 }
615
616
617 _lastSeenInstances.set(curInstances);
618 _lastSeenSessions.set(curSessions);
619
620 }
621
622 }