View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  
29  import org.apache.helix.controller.pipeline.AbstractBaseStage;
30  import org.apache.helix.controller.pipeline.StageException;
31  import org.apache.helix.model.IdealState;
32  import org.apache.helix.model.LiveInstance;
33  import org.apache.helix.model.Message;
34  import org.apache.helix.model.Partition;
35  import org.apache.helix.model.Resource;
36  import org.apache.helix.model.StateModelDefinition;
37  import org.apache.log4j.Logger;
38  
39  
40  public class MessageSelectionStage extends AbstractBaseStage
41  {
42    private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
43  
44    public static class Bounds
45    {
46      private int upper;
47      private int lower;
48  
49      public Bounds(int lower, int upper)
50      {
51        this.lower = lower;
52        this.upper = upper;
53      }
54  
55      public void increaseUpperBound()
56      {
57        upper++;
58      }
59  
60      public void increaseLowerBound()
61      {
62        lower++;
63      }
64  
65      public void decreaseUpperBound()
66      {
67        upper--;
68      }
69  
70      public void decreaseLowerBound()
71      {
72        lower--;
73      }
74  
75      public int getLowerBound()
76      {
77        return lower;
78      }
79  
80      public int getUpperBound()
81      {
82        return upper;
83      }
84    }
85  
86    @Override
87    public void process(ClusterEvent event) throws Exception
88    {
89      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
90      Map<String, Resource> resourceMap =
91          event.getAttribute(AttributeName.RESOURCES.toString());
92      CurrentStateOutput currentStateOutput =
93          event.getAttribute(AttributeName.CURRENT_STATE.toString());
94      MessageGenerationOutput messageGenOutput =
95          event.getAttribute(AttributeName.MESSAGES_ALL.toString());
96      if (cache == null || resourceMap == null || currentStateOutput == null
97          || messageGenOutput == null)
98      {
99        throw new StageException("Missing attributes in event:" + event
100           + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
101     }
102 
103     MessageSelectionStageOutput output = new MessageSelectionStageOutput();
104 
105     for (String resourceName : resourceMap.keySet())
106     {
107       Resource resource = resourceMap.get(resourceName);
108       StateModelDefinition stateModelDef =
109           cache.getStateModelDef(resource.getStateModelDefRef());
110 
111       Map<String, Integer> stateTransitionPriorities =
112           getStateTransitionPriorityMap(stateModelDef);
113       IdealState idealState = cache.getIdealState(resourceName);
114       Map<String, Bounds> stateConstraints =
115           computeStateConstraints(stateModelDef, idealState, cache);
116 
117       for (Partition partition : resource.getPartitions())
118       {
119         List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
120         List<Message> selectedMessages =
121             selectMessages(cache.getLiveInstances(),
122                            currentStateOutput.getCurrentStateMap(resourceName, partition),
123                            currentStateOutput.getPendingStateMap(resourceName, partition),
124                            messages,
125                            stateConstraints,
126                            stateTransitionPriorities,
127                            stateModelDef.getInitialState());
128         output.addMessages(resourceName, partition, selectedMessages);
129       }
130     }
131     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
132   }
133 
134   // TODO: This method deserves its own class. The class should not understand helix but
135   // just be
136   // able to solve the problem using the algo. I think the method is following that but if
137   // we don't move it to another class its quite easy to break that contract
138   /**
139    * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
140    * group messages by state transition and sorted by priority 3) from highest priority to
141    * lowest, for each message group with the same transition add message one by one and
142    * make sure state constraint is not violated update state lower/upper-bounds when a new
143    * message is selected
144    *
145    * @param currentStates
146    * @param pendingStates
147    * @param messages
148    * @param stateConstraints
149    *          : STATE -> bound (lower:upper)
150    * @param stateTransitionPriorities
151    *          : FROME_STATE-TO_STATE -> priority
152    * @return: selected messages
153    */
154   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
155                                Map<String, String> currentStates,
156                                Map<String, String> pendingStates,
157                                List<Message> messages,
158                                Map<String, Bounds> stateConstraints,
159                                final Map<String, Integer> stateTransitionPriorities,
160                                String initialState)
161   {
162     if (messages == null || messages.isEmpty())
163     {
164       return Collections.emptyList();
165     }
166 
167     List<Message> selectedMessages = new ArrayList<Message>();
168     Map<String, Bounds> bounds = new HashMap<String, Bounds>();
169 
170     // count currentState, if no currentState, count as in initialState
171     for (String instance : liveInstances.keySet())
172     {
173       String state = initialState;
174       if (currentStates.containsKey(instance))
175       {
176         state = currentStates.get(instance);
177       }
178 
179       if (!bounds.containsKey(state))
180       {
181         bounds.put(state, new Bounds(0, 0));
182       }
183       bounds.get(state).increaseLowerBound();
184       bounds.get(state).increaseUpperBound();
185     }
186 
187     // count pendingStates
188     for (String instance : pendingStates.keySet())
189     {
190       String state = pendingStates.get(instance);
191       if (!bounds.containsKey(state))
192       {
193         bounds.put(state, new Bounds(0, 0));
194       }
195       // TODO: add lower bound, need to refactor pendingState to include fromState also
196       bounds.get(state).increaseUpperBound();
197     }
198 
199     // group messages based on state transition priority
200     Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
201         new TreeMap<Integer, List<Message>>();
202     for (Message message : messages)
203     {
204       String fromState = message.getFromState();
205       String toState = message.getToState();
206       String transition = fromState + "-" + toState;
207       int priority = Integer.MAX_VALUE;
208 
209       if (stateTransitionPriorities.containsKey(transition))
210       {
211         priority = stateTransitionPriorities.get(transition);
212       }
213 
214       if (!messagesGroupByStateTransitPriority.containsKey(priority))
215       {
216         messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
217       }
218       messagesGroupByStateTransitPriority.get(priority).add(message);
219     }
220 
221     // select messages
222     for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
223     {
224       for (Message message : messageList)
225       {
226         String fromState = message.getFromState();
227         String toState = message.getToState();
228 
229         if (!bounds.containsKey(fromState))
230         {
231           LOG.error("Message's fromState is not in currentState. message: " + message);
232           continue;
233         }
234 
235         if (!bounds.containsKey(toState))
236         {
237           bounds.put(toState, new Bounds(0, 0));
238         }
239 
240         // check lower bound of fromState
241         if (stateConstraints.containsKey(fromState))
242         {
243           int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
244           if (newLowerBound < 0)
245           {
246             LOG.error("Number of currentState in " + fromState
247                 + " is less than number of messages transiting from " + fromState);
248             continue;
249           }
250 
251           if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
252           {
253             continue;
254           }
255         }
256 
257         // check upper bound of toState
258         if (stateConstraints.containsKey(toState))
259         {
260           int newUpperBound = bounds.get(toState).getUpperBound() + 1;
261           if (newUpperBound > stateConstraints.get(toState).getUpperBound())
262           {
263             continue;
264           }
265         }
266 
267         selectedMessages.add(message);
268         bounds.get(fromState).increaseLowerBound();
269         bounds.get(toState).increaseUpperBound();
270       }
271     }
272 
273     return selectedMessages;
274   }
275 
276   /**
277    * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
278    * beginning and compute the stateConstraint instance once and re use at other places.
279    * Each IdealState must have a constraint object associated with it
280    */
281   private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
282                                                       IdealState idealState,
283                                                       ClusterDataCache cache)
284   {
285     Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
286 
287     List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
288     for (String state : statePriorityList)
289     {
290       String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
291       int max = -1;
292       if ("N".equals(numInstancesPerState))
293       {
294         max = cache.getLiveInstances().size();
295       }
296       else if ("R".equals(numInstancesPerState))
297       {
298         // idealState is null when resource has been dropped,
299         // R can't be evaluated and ignore state constraints
300         if (idealState != null)
301         {
302           max = cache.getReplicas(idealState.getResourceName());
303         }
304       }
305       else
306       {
307         try
308         {
309           max = Integer.parseInt(numInstancesPerState);
310         }
311         catch (Exception e)
312         {
313           // use -1
314         }
315       }
316 
317       if (max > -1)
318       {
319         // if state has no constraint, will not put in map
320         stateConstraints.put(state, new Bounds(0, max));
321       }
322     }
323 
324     return stateConstraints;
325   }
326 
327   // TODO: if state transition priority is not provided then use lexicographical sorting
328   // so that behavior is consistent
329   private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
330   {
331     Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
332     List<String> stateTransitionPriorityList =
333         stateModelDef.getStateTransitionPriorityList();
334     for (int i = 0; i < stateTransitionPriorityList.size(); i++)
335     {
336       stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
337     }
338 
339     return stateTransitionPriorities;
340   }
341 }