View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.TreeMap;
31  
32  import org.apache.helix.HelixConstants;
33  import org.apache.helix.HelixDefinedState;
34  import org.apache.helix.HelixManager;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.HelixConstants.StateModelToken;
37  import org.apache.helix.controller.pipeline.AbstractBaseStage;
38  import org.apache.helix.controller.pipeline.StageException;
39  import org.apache.helix.model.CurrentState;
40  import org.apache.helix.model.IdealState;
41  import org.apache.helix.model.LiveInstance;
42  import org.apache.helix.model.Partition;
43  import org.apache.helix.model.Resource;
44  import org.apache.helix.model.StateModelDefinition;
45  import org.apache.helix.model.IdealState.IdealStateModeProperty;
46  import org.apache.log4j.Logger;
47  
48  
49  /**
50   * For partition compute best possible (instance,state) pair based on
51   * IdealState,StateModel,LiveInstance
52   * 
53   */
54  // TODO: refactor this
55  public class BestPossibleStateCalcStage extends AbstractBaseStage
56  {
57    private static final Logger logger =
58        Logger.getLogger(BestPossibleStateCalcStage.class.getName());
59  
60    @Override
61    public void process(ClusterEvent event) throws Exception
62    {
63      long startTime = System.currentTimeMillis();
64      logger.info("START BestPossibleStateCalcStage.process()");
65  
66      CurrentStateOutput currentStateOutput =
67          event.getAttribute(AttributeName.CURRENT_STATE.toString());
68      Map<String, Resource> resourceMap =
69          event.getAttribute(AttributeName.RESOURCES.toString());
70      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
71  
72      if (currentStateOutput == null || resourceMap == null || cache == null)
73      {
74        throw new StageException("Missing attributes in event:" + event
75            + ". Requires CURRENT_STATE|RESOURCES|DataCache");
76      }
77  
78      BestPossibleStateOutput bestPossibleStateOutput =
79          compute(event, resourceMap, currentStateOutput);
80      event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
81                         bestPossibleStateOutput);
82  
83      long endTime = System.currentTimeMillis();
84      logger.info("END BestPossibleStateCalcStage.process(). took: "
85          + (endTime - startTime) + " ms");
86    }
87  
88    private BestPossibleStateOutput compute(ClusterEvent event,
89                                            Map<String, Resource> resourceMap,
90                                            CurrentStateOutput currentStateOutput)
91    {
92      // for each ideal state
93      // read the state model def
94      // for each resource
95      // get the preference list
96      // for each instanceName check if its alive then assign a state
97      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
98      HelixManager manager = event.getAttribute("helixmanager");
99  
100     BestPossibleStateOutput output = new BestPossibleStateOutput();
101 
102     for (String resourceName : resourceMap.keySet())
103     {
104       logger.debug("Processing resource:" + resourceName);
105 
106       Resource resource = resourceMap.get(resourceName);
107       // Ideal state may be gone. In that case we need to get the state model name
108       // from the current state
109       IdealState idealState = cache.getIdealState(resourceName);
110 
111       String stateModelDefName;
112 
113       if (idealState == null)
114       {
115         // if ideal state is deleted, use an empty one
116         logger.info("resource:" + resourceName + " does not exist anymore");
117         stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
118         idealState = new IdealState(resourceName);
119       }
120       else
121       {
122         stateModelDefName = idealState.getStateModelDefRef();
123       }
124 
125       StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
126       if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
127       {
128         calculateAutoBalancedIdealState(cache,
129                                         idealState,
130                                         stateModelDef,
131                                         currentStateOutput);
132       }
133 
134       
135       for (Partition partition : resource.getPartitions())
136       {
137         Map<String, String> currentStateMap =
138             currentStateOutput.getCurrentStateMap(resourceName, partition);
139 
140         Map<String, String> bestStateForPartition;
141         Set<String> disabledInstancesForPartition =
142             cache.getDisabledInstancesForPartition(partition.toString());
143 
144         if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
145         {
146           Map<String, String> idealStateMap =
147               idealState.getInstanceStateMap(partition.getPartitionName());
148           bestStateForPartition =
149               computeCustomizedBestStateForPartition(cache,
150                                                      stateModelDef,
151                                                      idealStateMap,
152                                                      currentStateMap,
153                                                      disabledInstancesForPartition);
154         }
155         else
156         // both AUTO and AUTO_REBALANCE mode
157         {
158           List<String> instancePreferenceList =
159               getPreferenceList(cache, partition, idealState, stateModelDef);
160           
161           bestStateForPartition =
162               computeAutoBestStateForPartition(cache,
163                                                stateModelDef,
164                                                instancePreferenceList,
165                                                currentStateMap,
166                                                disabledInstancesForPartition);
167         }
168         output.setState(resourceName, partition, bestStateForPartition);
169       }
170     }
171     return output;
172   }
173 
174   /**
175    * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm
176    * will make sure that the master partition are evenly distributed; Also when instances
177    * are added / removed, the amount of diff in master partitions are minimized
178    * 
179    * @param cache
180    * @param idealState
181    * @param instancePreferenceList
182    * @param stateModelDef
183    * @param currentStateOutput
184    * @return
185    */
186   private void calculateAutoBalancedIdealState(ClusterDataCache cache,
187                                                IdealState idealState,
188                                                StateModelDefinition stateModelDef,
189                                                CurrentStateOutput currentStateOutput)
190   {
191     String topStateValue = stateModelDef.getStatesPriorityList().get(0);
192     Set<String> liveInstances = cache._liveInstanceMap.keySet();
193     Set<String> taggedInstances = new HashSet<String>();
194     
195     // If there are instances tagged with resource name, use only those instances
196     if(idealState.getInstanceGroupTag() != null)
197     {
198       for(String instanceName : liveInstances)
199       {
200         if(cache._instanceConfigMap.get(instanceName).containsTag(idealState.getInstanceGroupTag()))
201         {
202           taggedInstances.add(instanceName);
203         }
204       }
205     }
206     if(taggedInstances.size() > 0)
207     {
208       logger.info("found the following instances with tag " + idealState.getResourceName() + " " + taggedInstances);
209       liveInstances = taggedInstances;
210     }
211     // Obtain replica number
212     int replicas = 1;
213     try
214     {
215       replicas = Integer.parseInt(idealState.getReplicas());
216     }
217     catch (Exception e)
218     {
219       logger.error("", e);
220     }
221     // Init for all partitions with empty list
222     Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
223     List<String> emptyList = new ArrayList<String>(0);
224     for (String partition : idealState.getPartitionSet())
225     {
226       defaultListFields.put(partition, emptyList);
227     }
228     idealState.getRecord().setListFields(defaultListFields);
229     // Return if no live instance
230     if (liveInstances.size() == 0)
231     {
232       logger.info("No live instances, return. Idealstate : "
233           + idealState.getResourceName());
234       return;
235     }
236     Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
237     for (String instanceName : liveInstances)
238     {
239       masterAssignmentMap.put(instanceName, new ArrayList<String>());
240     }
241     Set<String> orphanedPartitions = new HashSet<String>();
242     orphanedPartitions.addAll(idealState.getPartitionSet());
243     // Go through all current states and fill the assignments
244     for (String liveInstanceName : liveInstances)
245     {
246       CurrentState currentState =
247           cache.getCurrentState(liveInstanceName,
248                                 cache.getLiveInstances()
249                                      .get(liveInstanceName)
250                                      .getSessionId()).get(idealState.getId());
251       if (currentState != null)
252       {
253         Map<String, String> partitionStates = currentState.getPartitionStateMap();
254         for (String partitionName : partitionStates.keySet())
255         {
256           String state = partitionStates.get(partitionName);
257           if (state.equals(topStateValue))
258           {
259             masterAssignmentMap.get(liveInstanceName).add(partitionName);
260             orphanedPartitions.remove(partitionName);
261           }
262         }
263       }
264     }
265     List<String> orphanedPartitionsList = new ArrayList<String>();
266     orphanedPartitionsList.addAll(orphanedPartitions);
267     int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
268     normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
269     idealState.getRecord()
270               .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
271                                                                    replicas));
272 
273   }
274 
275   /**
276    * Given the current master assignment map and the partitions not hosted, generate an
277    * evenly distributed partition assignment map
278    * 
279    * @param masterAssignmentMap
280    *          current master assignment map
281    * @param orphanPartitions
282    *          partitions not hosted by any instance
283    * @return
284    */
285   private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
286                                       List<String> orphanPartitions, int maxPartitionsPerInstance)
287   {
288     int totalPartitions = 0;
289     String[] instanceNames = new String[masterAssignmentMap.size()];
290     masterAssignmentMap.keySet().toArray(instanceNames);
291     Arrays.sort(instanceNames);
292     // Find out total partition number
293     for (String key : masterAssignmentMap.keySet())
294     {
295       totalPartitions += masterAssignmentMap.get(key).size();
296       Collections.sort(masterAssignmentMap.get(key));
297     }
298     totalPartitions += orphanPartitions.size();
299 
300     // Find out how many partitions an instance should host
301     int partitionNumber = totalPartitions / masterAssignmentMap.size();
302     int leave = totalPartitions % masterAssignmentMap.size();
303 
304     for (int i = 0; i < instanceNames.length; i++)
305     {
306       int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
307       leave--;
308       // For hosts that has more partitions, move those partitions to "orphaned"
309       while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
310       {
311         int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
312         orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
313                                                 .get(lastElementIndex));
314         masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
315       }
316     }
317     leave = totalPartitions % masterAssignmentMap.size();
318     Collections.sort(orphanPartitions);
319     // Assign "orphaned" partitions to hosts that do not have enough partitions
320     for (int i = 0; i < instanceNames.length; i++)
321     {
322       int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
323       leave--;
324       if(targetPartitionNo > maxPartitionsPerInstance)
325       {
326         targetPartitionNo = maxPartitionsPerInstance;
327       }
328       while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
329       {
330         int lastElementIndex = orphanPartitions.size() - 1;
331         masterAssignmentMap.get(instanceNames[i])
332                            .add(orphanPartitions.get(lastElementIndex));
333         orphanPartitions.remove(lastElementIndex);
334       }
335     }
336     if (orphanPartitions.size() > 0)
337     {
338       logger.warn("orphanPartitions still contains elements");
339     }
340   }
341 
342   /**
343    * Generate full preference list from the master assignment map evenly distribute the
344    * slave partitions mastered on a host to other hosts
345    * 
346    * @param masterAssignmentMap
347    *          current master assignment map
348    * @param orphanPartitions
349    *          partitions not hosted by any instance
350    * @return
351    */
352   Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
353                                                                   int replicas)
354   {
355     Map<String, List<String>> listFields = new HashMap<String, List<String>>();
356     int slaves = replicas - 1;
357     String[] instanceNames = new String[masterAssignmentMap.size()];
358     masterAssignmentMap.keySet().toArray(instanceNames);
359     Arrays.sort(instanceNames);
360 
361     for (int i = 0; i < instanceNames.length; i++)
362     {
363       String instanceName = instanceNames[i];
364       List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
365       for (int x = 0; x < instanceNames.length - 1; x++)
366       {
367         int index = (x + i + 1) % instanceNames.length;
368         otherInstances.add(instanceNames[index]);
369       }
370 
371       List<String> partitionList = masterAssignmentMap.get(instanceName);
372       for (int j = 0; j < partitionList.size(); j++)
373       {
374         String partitionName = partitionList.get(j);
375         listFields.put(partitionName, new ArrayList<String>());
376         listFields.get(partitionName).add(instanceName);
377 
378         int slavesCanAssign = Math.min(slaves, otherInstances.size());
379         for (int k = 0; k < slavesCanAssign; k++)
380         {
381           int index = (j + k + 1) % otherInstances.size();
382           listFields.get(partitionName).add(otherInstances.get(index));
383         }
384       }
385     }
386     return listFields;
387   }
388 
389   /**
390    * compute best state for resource in AUTO ideal state mode
391    * 
392    * @param cache
393    * @param stateModelDef
394    * @param instancePreferenceList
395    * @param currentStateMap
396    *          : instance->state for each partition
397    * @param disabledInstancesForPartition
398    * @return
399    */
400   private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
401                                                                StateModelDefinition stateModelDef,
402                                                                List<String> instancePreferenceList,
403                                                                Map<String, String> currentStateMap,
404                                                                Set<String> disabledInstancesForPartition)
405   {
406     Map<String, String> instanceStateMap = new HashMap<String, String>();
407 
408     // if the ideal state is deleted, instancePreferenceList will be empty and
409     // we should drop all resources.
410     if (currentStateMap != null)
411     {
412       for (String instance : currentStateMap.keySet())
413       {
414         if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
415             && !disabledInstancesForPartition.contains(instance))
416         {
417           // if dropped and not disabled, transit to DROPPED
418           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
419         }
420         else if ( (currentStateMap.get(instance) == null 
421             || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
422             && disabledInstancesForPartition.contains(instance))
423         {
424           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
425           instanceStateMap.put(instance, stateModelDef.getInitialState());
426         }
427       }
428     }
429 
430     // ideal state is deleted
431     if (instancePreferenceList == null)
432     {
433       return instanceStateMap;
434     }
435 
436     List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
437     boolean assigned[] = new boolean[instancePreferenceList.size()];
438 
439     Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
440 
441     for (String state : statesPriorityList)
442     {
443       String num = stateModelDef.getNumInstancesPerState(state);
444       int stateCount = -1;
445       if ("N".equals(num))
446       {
447         Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
448         liveAndEnabled.removeAll(disabledInstancesForPartition);
449         stateCount = liveAndEnabled.size();
450       }
451       else if ("R".equals(num))
452       {
453         stateCount = instancePreferenceList.size();
454       }
455       else
456       {
457         try
458         {
459           stateCount = Integer.parseInt(num);
460         }
461         catch (Exception e)
462         {
463           logger.error("Invalid count for state:" + state + " ,count=" + num);
464         }
465       }
466       if (stateCount > -1)
467       {
468         int count = 0;
469         for (int i = 0; i < instancePreferenceList.size(); i++)
470         {
471           String instanceName = instancePreferenceList.get(i);
472 
473           boolean notInErrorState = currentStateMap == null 
474               || currentStateMap.get(instanceName) == null
475               || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
476 
477           if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
478               && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
479           {
480             instanceStateMap.put(instanceName, state);
481             count = count + 1;
482             assigned[i] = true;
483             if (count == stateCount)
484             {
485               break;
486             }
487           }
488         }
489       }
490     }
491     return instanceStateMap;
492   }
493 
494   /**
495    * compute best state for resource in CUSTOMIZED ideal state mode
496    * 
497    * @param cache
498    * @param stateModelDef
499    * @param idealStateMap
500    * @param currentStateMap
501    * @param disabledInstancesForPartition
502    * @return
503    */
504   private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
505                                                                      StateModelDefinition stateModelDef,
506                                                                      Map<String, String> idealStateMap,
507                                                                      Map<String, String> currentStateMap,
508                                                                      Set<String> disabledInstancesForPartition)
509   {
510     Map<String, String> instanceStateMap = new HashMap<String, String>();
511 
512     // if the ideal state is deleted, idealStateMap will be null/empty and
513     // we should drop all resources.
514     if (currentStateMap != null)
515     {
516       for (String instance : currentStateMap.keySet())
517       {
518         if ((idealStateMap == null || !idealStateMap.containsKey(instance))
519             && !disabledInstancesForPartition.contains(instance))
520         {
521           // if dropped and not disabled, transit to DROPPED
522           instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
523         }
524         else if ( (currentStateMap.get(instance) == null 
525             || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
526             && disabledInstancesForPartition.contains(instance))
527         {
528           // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE)
529           instanceStateMap.put(instance, stateModelDef.getInitialState());
530         }
531       }
532     }
533 
534     // ideal state is deleted
535     if (idealStateMap == null)
536     {
537       return instanceStateMap;
538     }
539 
540     Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
541     for (String instance : idealStateMap.keySet())
542     {
543       boolean notInErrorState = currentStateMap == null 
544           || currentStateMap.get(instance) == null
545           || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
546 
547       if (liveInstancesMap.containsKey(instance) && notInErrorState
548           && !disabledInstancesForPartition.contains(instance))
549       {
550         instanceStateMap.put(instance, idealStateMap.get(instance));
551       }
552     }
553 
554     return instanceStateMap;
555   }
556 
557   private List<String> getPreferenceList(ClusterDataCache cache,
558                                          Partition resource,
559                                          IdealState idealState,
560                                          StateModelDefinition stateModelDef)
561   {
562     List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
563 
564     if (listField != null && listField.size() == 1
565         && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
566     {
567       Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
568       List<String> prefList = new ArrayList<String>(liveInstances.keySet());
569       Collections.sort(prefList);
570       return prefList;
571     }
572     else
573     {
574       return listField;
575     }
576   }
577 }