View Javadoc

1   package org.apache.helix.controller;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Timer;
28  import java.util.TimerTask;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.apache.helix.ConfigChangeListener;
32  import org.apache.helix.ControllerChangeListener;
33  import org.apache.helix.CurrentStateChangeListener;
34  import org.apache.helix.ExternalViewChangeListener;
35  import org.apache.helix.HealthStateChangeListener;
36  import org.apache.helix.HelixDataAccessor;
37  import org.apache.helix.HelixManager;
38  import org.apache.helix.IdealStateChangeListener;
39  import org.apache.helix.LiveInstanceChangeListener;
40  import org.apache.helix.MessageListener;
41  import org.apache.helix.NotificationContext;
42  import org.apache.helix.ZNRecord;
43  import org.apache.helix.NotificationContext.Type;
44  import org.apache.helix.PropertyKey.Builder;
45  import org.apache.helix.controller.pipeline.Pipeline;
46  import org.apache.helix.controller.pipeline.PipelineRegistry;
47  import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
48  import org.apache.helix.controller.stages.ClusterEvent;
49  import org.apache.helix.controller.stages.CompatibilityCheckStage;
50  import org.apache.helix.controller.stages.CurrentStateComputationStage;
51  import org.apache.helix.controller.stages.ExternalViewComputeStage;
52  import org.apache.helix.controller.stages.MessageGenerationPhase;
53  import org.apache.helix.controller.stages.MessageSelectionStage;
54  import org.apache.helix.controller.stages.MessageThrottleStage;
55  import org.apache.helix.controller.stages.ReadClusterDataStage;
56  import org.apache.helix.controller.stages.RebalanceIdealStateStage;
57  import org.apache.helix.controller.stages.ResourceComputationStage;
58  import org.apache.helix.controller.stages.TaskAssignmentStage;
59  import org.apache.helix.model.CurrentState;
60  import org.apache.helix.model.ExternalView;
61  import org.apache.helix.model.HealthStat;
62  import org.apache.helix.model.IdealState;
63  import org.apache.helix.model.InstanceConfig;
64  import org.apache.helix.model.LiveInstance;
65  import org.apache.helix.model.Message;
66  import org.apache.helix.model.PauseSignal;
67  import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
68  import org.apache.log4j.Logger;
69  
70  
71  /**
72   * Cluster Controllers main goal is to keep the cluster state as close as possible to
73   * Ideal State. It does this by listening to changes in cluster state and scheduling new
74   * tasks to get cluster state to best possible ideal state. Every instance of this class
75   * can control can control only one cluster
76   *
77   *
78   * Get all the partitions use IdealState, CurrentState and Messages <br>
79   * foreach partition <br>
80   * 1. get the (instance,state) from IdealState, CurrentState and PendingMessages <br>
81   * 2. compute best possible state (instance,state) pair. This needs previous step data and
82   * state model constraints <br>
83   * 3. compute the messages/tasks needed to move to 1 to 2 <br>
84   * 4. select the messages that can be sent, needs messages and state model constraints <br>
85   * 5. send messages
86   */
87  public class GenericHelixController implements
88      ConfigChangeListener,
89      IdealStateChangeListener,
90      LiveInstanceChangeListener,
91      MessageListener,
92      CurrentStateChangeListener,
93      ExternalViewChangeListener,
94      ControllerChangeListener,
95      HealthStateChangeListener
96  {
97    private static final Logger    logger =
98                                              Logger.getLogger(GenericHelixController.class.getName());
99    volatile boolean               init   = false;
100   private final PipelineRegistry _registry;
101 
102   final AtomicReference<Map<String, LiveInstance>>	_lastSeenInstances;
103   final AtomicReference<Map<String, LiveInstance>>	_lastSeenSessions;
104   
105   ClusterStatusMonitor           _clusterStatusMonitor;
106   
107 
108   /**
109    * The _paused flag is checked by function handleEvent(), while if the flag is set
110    * handleEvent() will be no-op. Other event handling logic keeps the same when the flag
111    * is set.
112    */
113   private boolean                _paused;
114 
115   /**
116    * The timer that can periodically run the rebalancing pipeline. The timer will start if there
117    * is one resource group has the config to use the timer.
118    */
119   Timer _rebalanceTimer = null;
120   int _timerPeriod = Integer.MAX_VALUE;
121 
122   /**
123    * Default constructor that creates a default pipeline registry. This is sufficient in
124    * most cases, but if there is a some thing specific needed use another constructor
125    * where in you can pass a pipeline registry
126    */
127   public GenericHelixController()
128   {
129     this(createDefaultRegistry());
130   }
131 
132   class RebalanceTask extends TimerTask
133   {
134     HelixManager _manager;
135     
136     public RebalanceTask(HelixManager manager)
137     {
138       _manager = manager;
139     }
140     
141     @Override
142     public void run()
143     {
144       NotificationContext changeContext = new NotificationContext(_manager);
145       changeContext.setType(NotificationContext.Type.CALLBACK);
146       ClusterEvent event = new ClusterEvent("periodicalRebalance");
147       event.addAttribute("helixmanager", changeContext.getManager());
148       event.addAttribute("changeContext", changeContext);
149       List<ZNRecord> dummy = new ArrayList<ZNRecord>();
150       event.addAttribute("eventData", dummy);
151       // Should be able to process  
152       handleEvent(event);
153     }
154   }
155   
156   /**
157    * Starts the rebalancing timer with the specified period. Start the timer if necessary;
158    * If the period is smaller than the current period, cancel the current timer and use 
159    * the new period.
160    */
161   void startRebalancingTimer(int period, HelixManager manager)
162   {
163     logger.info("Controller starting timer at period " + period);
164     if(period < _timerPeriod)
165     {
166       if(_rebalanceTimer != null)
167       {
168         _rebalanceTimer.cancel();
169       }
170       _rebalanceTimer = new Timer(true);
171       _timerPeriod = period;
172       _rebalanceTimer.scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
173     }
174     else
175     {
176       logger.info("Controller already has timer at period " + _timerPeriod);
177     }
178   }
179   
180   /**
181    * Starts the rebalancing timer 
182    */
183   void stopRebalancingTimer()
184   {
185     if(_rebalanceTimer != null)
186     {
187       _rebalanceTimer.cancel();
188       _rebalanceTimer = null;
189     }
190     _timerPeriod = Integer.MAX_VALUE;
191   }
192   
193   private static PipelineRegistry createDefaultRegistry()
194   {
195     logger.info("createDefaultRegistry");
196     synchronized (GenericHelixController.class)
197     {
198       PipelineRegistry registry = new PipelineRegistry();
199 
200       // cluster data cache refresh
201       Pipeline dataRefresh = new Pipeline();
202       dataRefresh.addStage(new ReadClusterDataStage());
203 
204       // rebalance pipeline
205       Pipeline rebalancePipeline = new Pipeline();
206       rebalancePipeline.addStage(new ResourceComputationStage());
207       rebalancePipeline.addStage(new CurrentStateComputationStage());
208       rebalancePipeline.addStage(new RebalanceIdealStateStage());
209       rebalancePipeline.addStage(new BestPossibleStateCalcStage());
210       rebalancePipeline.addStage(new MessageGenerationPhase());
211       rebalancePipeline.addStage(new MessageSelectionStage());
212       rebalancePipeline.addStage(new MessageThrottleStage());
213       rebalancePipeline.addStage(new TaskAssignmentStage());
214 
215       // external view generation
216       Pipeline externalViewPipeline = new Pipeline();
217       externalViewPipeline.addStage(new ExternalViewComputeStage());
218 
219       // backward compatibility check
220       Pipeline liveInstancePipeline = new Pipeline();
221       liveInstancePipeline.addStage(new CompatibilityCheckStage());
222 
223       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
224       registry.register("currentStateChange",
225                         dataRefresh,
226                         rebalancePipeline,
227                         externalViewPipeline);
228       registry.register("configChange", dataRefresh, rebalancePipeline);
229       registry.register("liveInstanceChange",
230                         dataRefresh,
231                         liveInstancePipeline,
232                         rebalancePipeline,
233                         externalViewPipeline);
234 
235       registry.register("messageChange",
236                         dataRefresh,
237                         rebalancePipeline);
238       registry.register("externalView", dataRefresh);
239       registry.register("resume", dataRefresh, rebalancePipeline, externalViewPipeline);
240       registry.register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
241 
242       // health stats pipeline
243       // Pipeline healthStatsAggregationPipeline = new Pipeline();
244       // StatsAggregationStage statsStage = new StatsAggregationStage();
245       // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
246       // healthStatsAggregationPipeline.addStage(statsStage);
247       // registry.register("healthChange", healthStatsAggregationPipeline);
248 
249       return registry;
250     }
251   }
252 
253   public GenericHelixController(PipelineRegistry registry)
254   {
255     _paused = false;
256     _registry = registry;
257     _lastSeenInstances = new AtomicReference<Map<String, LiveInstance>>();
258     _lastSeenSessions = new AtomicReference<Map<String,LiveInstance>>();
259   }
260 
261   /**
262    * lock-always: caller always needs to obtain an external lock before call, calls to
263    * handleEvent() should be serialized
264    *
265    * @param event
266    */
267   protected synchronized void handleEvent(ClusterEvent event)
268   {
269     HelixManager manager = event.getAttribute("helixmanager");
270     if (manager == null)
271     {
272       logger.error("No cluster manager in event:" + event.getName());
273       return;
274     }
275 
276     if (!manager.isLeader())
277     {
278       logger.error("Cluster manager: " + manager.getInstanceName()
279           + " is not leader. Pipeline will not be invoked");
280       return;
281     }
282 
283     if (_paused)
284     {
285       logger.info("Cluster is paused. Ignoring the event:" + event.getName());
286       return;
287     }
288 
289     NotificationContext context = null;
290     if (event.getAttribute("changeContext") != null)
291     {
292       context = (NotificationContext) (event.getAttribute("changeContext"));
293     }
294 
295     // Initialize _clusterStatusMonitor
296     if (context != null)
297     {
298       if (context.getType() == Type.FINALIZE)
299       {
300         if (_clusterStatusMonitor != null)
301         {
302           _clusterStatusMonitor.reset();
303           _clusterStatusMonitor = null;
304         }
305         
306         stopRebalancingTimer();
307         logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getName());
308         return;
309       }
310       else
311       {
312         if (_clusterStatusMonitor == null)
313         {
314           _clusterStatusMonitor = new ClusterStatusMonitor(manager.getClusterName());
315         }
316         
317         event.addAttribute("clusterStatusMonitor", _clusterStatusMonitor);
318       }
319     }
320 
321     List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName());
322     if (pipelines == null || pipelines.size() == 0)
323     {
324       logger.info("No pipeline to run for event:" + event.getName());
325       return;
326     }
327 
328     for (Pipeline pipeline : pipelines)
329     {
330       try
331       {
332         pipeline.handle(event);
333         pipeline.finish();
334       }
335       catch (Exception e)
336       {
337         logger.error("Exception while executing pipeline: " + pipeline
338             + ". Will not continue to next pipeline", e);
339         break;
340       }
341     }
342   }
343 
344   // TODO since we read data in pipeline, we can get rid of reading from zookeeper in
345   // callback
346 
347   @Override
348   public void onExternalViewChange(List<ExternalView> externalViewList,
349                                    NotificationContext changeContext)
350   {
351 //    logger.info("START: GenericClusterController.onExternalViewChange()");
352 //    ClusterEvent event = new ClusterEvent("externalViewChange");
353 //    event.addAttribute("helixmanager", changeContext.getManager());
354 //    event.addAttribute("changeContext", changeContext);
355 //    event.addAttribute("eventData", externalViewList);
356 //    // handleEvent(event);
357 //    logger.info("END: GenericClusterController.onExternalViewChange()");
358   }
359 
360   @Override
361   public void onStateChange(String instanceName,
362                             List<CurrentState> statesInfo,
363                             NotificationContext changeContext)
364   {
365     logger.info("START: GenericClusterController.onStateChange()");
366     ClusterEvent event = new ClusterEvent("currentStateChange");
367     event.addAttribute("helixmanager", changeContext.getManager());
368     event.addAttribute("instanceName", instanceName);
369     event.addAttribute("changeContext", changeContext);
370     event.addAttribute("eventData", statesInfo);
371     handleEvent(event);
372     logger.info("END: GenericClusterController.onStateChange()");
373   }
374 
375   @Override
376   public void onHealthChange(String instanceName,
377                              List<HealthStat> reports,
378                              NotificationContext changeContext)
379   {
380     /**
381      * When there are more participant ( > 20, can be in hundreds), This callback can be
382      * called quite frequently as each participant reports health stat every minute. Thus
383      * we change the health check pipeline to run in a timer callback.
384      */
385   }
386 
387   @Override
388   public void onMessage(String instanceName,
389                         List<Message> messages,
390                         NotificationContext changeContext)
391   {
392     logger.info("START: GenericClusterController.onMessage()");
393     
394     ClusterEvent event = new ClusterEvent("messageChange");
395     event.addAttribute("helixmanager", changeContext.getManager());
396     event.addAttribute("instanceName", instanceName);
397     event.addAttribute("changeContext", changeContext);
398     event.addAttribute("eventData", messages);
399     handleEvent(event);
400     
401     if (_clusterStatusMonitor != null && messages != null)
402     {
403       _clusterStatusMonitor.addMessageQueueSize(instanceName, messages.size());
404     }
405         
406     logger.info("END: GenericClusterController.onMessage()");
407   }
408 
409   @Override
410   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
411                                    NotificationContext changeContext)
412   {
413     logger.info("START: Generic GenericClusterController.onLiveInstanceChange()");
414     if (liveInstances == null)
415     {
416       liveInstances = Collections.emptyList();
417     }
418     // Go though the live instance list and make sure that we are observing them
419     // accordingly. The action is done regardless of the paused flag.
420     if (changeContext.getType() == NotificationContext.Type.INIT ||
421         changeContext.getType() == NotificationContext.Type.CALLBACK)
422     {
423       checkLiveInstancesObservation(liveInstances, changeContext);
424     }
425 
426     ClusterEvent event = new ClusterEvent("liveInstanceChange");
427     event.addAttribute("helixmanager", changeContext.getManager());
428     event.addAttribute("changeContext", changeContext);
429     event.addAttribute("eventData", liveInstances);
430     handleEvent(event);
431     logger.info("END: Generic GenericClusterController.onLiveInstanceChange()");
432   }
433   
434   void checkRebalancingTimer(HelixManager manager, List<IdealState> idealStates)
435   {
436     if (manager.getConfigAccessor() == null)
437     {
438       logger.warn(manager.getInstanceName() + " config accessor doesn't exist. should be in file-based mode.");
439       return;
440     }
441     
442     for(IdealState idealState : idealStates)
443     {
444       int period = idealState.getRebalanceTimerPeriod();
445       if(period > 0)
446       {
447         startRebalancingTimer(period, manager);
448       }
449     }
450   }
451   
452   @Override
453   public void onIdealStateChange(List<IdealState> idealStates,
454                                  NotificationContext changeContext)
455   {
456     logger.info("START: Generic GenericClusterController.onIdealStateChange()");
457     ClusterEvent event = new ClusterEvent("idealStateChange");
458     event.addAttribute("helixmanager", changeContext.getManager());
459     event.addAttribute("changeContext", changeContext);
460     event.addAttribute("eventData", idealStates);
461     handleEvent(event);
462     
463     if(changeContext.getType() != Type.FINALIZE)
464     {
465       checkRebalancingTimer(changeContext.getManager(), idealStates);
466     }
467     
468     logger.info("END: Generic GenericClusterController.onIdealStateChange()");
469   }
470 
471   @Override
472   public void onConfigChange(List<InstanceConfig> configs,
473                              NotificationContext changeContext)
474   {
475     logger.info("START: GenericClusterController.onConfigChange()");
476     ClusterEvent event = new ClusterEvent("configChange");
477     event.addAttribute("changeContext", changeContext);
478     event.addAttribute("helixmanager", changeContext.getManager());
479     event.addAttribute("eventData", configs);
480     handleEvent(event);
481     logger.info("END: GenericClusterController.onConfigChange()");
482   }
483 
484   @Override
485   public void onControllerChange(NotificationContext changeContext)
486   {
487     logger.info("START: GenericClusterController.onControllerChange()");
488     if (changeContext!= null && changeContext.getType() == Type.FINALIZE)
489     {
490       logger.info("GenericClusterController.onControllerChange() FINALIZE");
491       return;
492     }
493     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
494 
495     // double check if this controller is the leader
496     Builder keyBuilder = accessor.keyBuilder();
497     LiveInstance leader =
498         accessor.getProperty(keyBuilder.controllerLeader());
499     if (leader == null)
500     {
501       logger.warn("No controller exists for cluster:"
502           + changeContext.getManager().getClusterName());
503       return;
504     }
505     else
506     {
507       String leaderName = leader.getInstanceName();
508 
509       String instanceName = changeContext.getManager().getInstanceName();
510       if (leaderName == null || !leaderName.equals(instanceName))
511       {
512         logger.warn("leader name does NOT match, my name: " + instanceName + ", leader: "
513             + leader);
514         return;
515       }
516     }
517 
518     PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
519     if (pauseSignal != null)
520     {
521       _paused = true;
522       logger.info("controller is now paused");
523     }
524     else
525     {
526       if (_paused)
527       {
528         // it currently paused
529         logger.info("controller is now resumed");
530         _paused = false;
531         ClusterEvent event = new ClusterEvent("resume");
532         event.addAttribute("changeContext", changeContext);
533         event.addAttribute("helixmanager", changeContext.getManager());
534         event.addAttribute("eventData", pauseSignal);
535         handleEvent(event);
536       }
537       else
538       {
539         _paused = false;
540       }
541     }
542     logger.info("END: GenericClusterController.onControllerChange()");
543   }
544 
545   /**
546    * Go through the list of liveinstances in the cluster, and add currentstateChange
547    * listener and Message listeners to them if they are newly added. For current state
548    * change, the observation is tied to the session id of each live instance.
549    *
550    */
551   protected void checkLiveInstancesObservation(List<LiveInstance> liveInstances,
552                                                NotificationContext changeContext)
553   {
554 
555 	// construct maps for current live-instances
556 	Map<String, LiveInstance> curInstances = new HashMap<String, LiveInstance>();
557 	Map<String, LiveInstance> curSessions = new HashMap<String, LiveInstance>();
558 	for(LiveInstance liveInstance : liveInstances) {
559 		curInstances.put(liveInstance.getInstanceName(), liveInstance);
560 		curSessions.put(liveInstance.getSessionId(), liveInstance);
561 	}
562 
563 	Map<String, LiveInstance> lastInstances = _lastSeenInstances.get();
564 	Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
565 
566     HelixManager manager = changeContext.getManager();
567     Builder keyBuilder = new Builder(manager.getClusterName());
568     if (lastSessions != null) {
569     	for (String session : lastSessions.keySet()) {
570     		if (!curSessions.containsKey(session)) {
571     			// remove current-state listener for expired session
572     		    String instanceName = lastSessions.get(session).getInstanceName();
573     			manager.removeListener(keyBuilder.currentStates(instanceName, session), this); 
574     		}
575     	}
576     }
577     
578     if (lastInstances != null) {
579     	for (String instance : lastInstances.keySet()) {
580     		if (!curInstances.containsKey(instance)) {
581     			// remove message listener for disconnected instances
582     			manager.removeListener(keyBuilder.messages(instance), this);
583     		}
584     	}
585     }
586     
587 	for (String session : curSessions.keySet()) {
588 		if (lastSessions == null || !lastSessions.containsKey(session)) {
589 	      String instanceName = curSessions.get(session).getInstanceName();
590           try {
591             // add current-state listeners for new sessions
592 	        manager.addCurrentStateChangeListener(this, instanceName, session);
593 	        logger.info("Succeed in addling current state listener for instance: " + instanceName + " with session: " + session);
594 
595           } catch (Exception e) {
596         	  logger.error("Fail to add current state listener for instance: "
597         		  + instanceName + " with session: " + session, e);
598           }
599 		}
600 	}
601 
602 	for (String instance : curInstances.keySet()) {
603 		if (lastInstances == null || !lastInstances.containsKey(instance)) {
604 	        try {
605 	          // add message listeners for new instances
606 	          manager.addMessageListener(this, instance);
607 	          logger.info("Succeed in adding message listener for " + instance);
608 	        }
609 	        catch (Exception e)
610 	        {
611 	          logger.error("Fail to add message listener for instance: " + instance, e);
612 	        }
613 		}
614 	}
615 
616 	// update last-seen
617 	_lastSeenInstances.set(curInstances);
618 	_lastSeenSessions.set(curSessions);
619 
620   }
621 
622 }