1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28
29 import org.apache.helix.controller.pipeline.AbstractBaseStage;
30 import org.apache.helix.controller.pipeline.StageException;
31 import org.apache.helix.model.IdealState;
32 import org.apache.helix.model.LiveInstance;
33 import org.apache.helix.model.Message;
34 import org.apache.helix.model.Partition;
35 import org.apache.helix.model.Resource;
36 import org.apache.helix.model.StateModelDefinition;
37 import org.apache.log4j.Logger;
38
39
40 public class MessageSelectionStage extends AbstractBaseStage
41 {
42 private static final Logger LOG = Logger.getLogger(MessageSelectionStage.class);
43
44 public static class Bounds
45 {
46 private int upper;
47 private int lower;
48
49 public Bounds(int lower, int upper)
50 {
51 this.lower = lower;
52 this.upper = upper;
53 }
54
55 public void increaseUpperBound()
56 {
57 upper++;
58 }
59
60 public void increaseLowerBound()
61 {
62 lower++;
63 }
64
65 public void decreaseUpperBound()
66 {
67 upper--;
68 }
69
70 public void decreaseLowerBound()
71 {
72 lower--;
73 }
74
75 public int getLowerBound()
76 {
77 return lower;
78 }
79
80 public int getUpperBound()
81 {
82 return upper;
83 }
84 }
85
86 @Override
87 public void process(ClusterEvent event) throws Exception
88 {
89 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
90 Map<String, Resource> resourceMap =
91 event.getAttribute(AttributeName.RESOURCES.toString());
92 CurrentStateOutput currentStateOutput =
93 event.getAttribute(AttributeName.CURRENT_STATE.toString());
94 MessageGenerationOutput messageGenOutput =
95 event.getAttribute(AttributeName.MESSAGES_ALL.toString());
96 if (cache == null || resourceMap == null || currentStateOutput == null
97 || messageGenOutput == null)
98 {
99 throw new StageException("Missing attributes in event:" + event
100 + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
101 }
102
103 MessageSelectionStageOutput output = new MessageSelectionStageOutput();
104
105 for (String resourceName : resourceMap.keySet())
106 {
107 Resource resource = resourceMap.get(resourceName);
108 StateModelDefinition stateModelDef =
109 cache.getStateModelDef(resource.getStateModelDefRef());
110
111 Map<String, Integer> stateTransitionPriorities =
112 getStateTransitionPriorityMap(stateModelDef);
113 IdealState idealState = cache.getIdealState(resourceName);
114 Map<String, Bounds> stateConstraints =
115 computeStateConstraints(stateModelDef, idealState, cache);
116
117 for (Partition partition : resource.getPartitions())
118 {
119 List<Message> messages = messageGenOutput.getMessages(resourceName, partition);
120 List<Message> selectedMessages =
121 selectMessages(cache.getLiveInstances(),
122 currentStateOutput.getCurrentStateMap(resourceName, partition),
123 currentStateOutput.getPendingStateMap(resourceName, partition),
124 messages,
125 stateConstraints,
126 stateTransitionPriorities,
127 stateModelDef.getInitialState());
128 output.addMessages(resourceName, partition, selectedMessages);
129 }
130 }
131 event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
132 }
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
155 Map<String, String> currentStates,
156 Map<String, String> pendingStates,
157 List<Message> messages,
158 Map<String, Bounds> stateConstraints,
159 final Map<String, Integer> stateTransitionPriorities,
160 String initialState)
161 {
162 if (messages == null || messages.isEmpty())
163 {
164 return Collections.emptyList();
165 }
166
167 List<Message> selectedMessages = new ArrayList<Message>();
168 Map<String, Bounds> bounds = new HashMap<String, Bounds>();
169
170
171 for (String instance : liveInstances.keySet())
172 {
173 String state = initialState;
174 if (currentStates.containsKey(instance))
175 {
176 state = currentStates.get(instance);
177 }
178
179 if (!bounds.containsKey(state))
180 {
181 bounds.put(state, new Bounds(0, 0));
182 }
183 bounds.get(state).increaseLowerBound();
184 bounds.get(state).increaseUpperBound();
185 }
186
187
188 for (String instance : pendingStates.keySet())
189 {
190 String state = pendingStates.get(instance);
191 if (!bounds.containsKey(state))
192 {
193 bounds.put(state, new Bounds(0, 0));
194 }
195
196 bounds.get(state).increaseUpperBound();
197 }
198
199
200 Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
201 new TreeMap<Integer, List<Message>>();
202 for (Message message : messages)
203 {
204 String fromState = message.getFromState();
205 String toState = message.getToState();
206 String transition = fromState + "-" + toState;
207 int priority = Integer.MAX_VALUE;
208
209 if (stateTransitionPriorities.containsKey(transition))
210 {
211 priority = stateTransitionPriorities.get(transition);
212 }
213
214 if (!messagesGroupByStateTransitPriority.containsKey(priority))
215 {
216 messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
217 }
218 messagesGroupByStateTransitPriority.get(priority).add(message);
219 }
220
221
222 for (List<Message> messageList : messagesGroupByStateTransitPriority.values())
223 {
224 for (Message message : messageList)
225 {
226 String fromState = message.getFromState();
227 String toState = message.getToState();
228
229 if (!bounds.containsKey(fromState))
230 {
231 LOG.error("Message's fromState is not in currentState. message: " + message);
232 continue;
233 }
234
235 if (!bounds.containsKey(toState))
236 {
237 bounds.put(toState, new Bounds(0, 0));
238 }
239
240
241 if (stateConstraints.containsKey(fromState))
242 {
243 int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
244 if (newLowerBound < 0)
245 {
246 LOG.error("Number of currentState in " + fromState
247 + " is less than number of messages transiting from " + fromState);
248 continue;
249 }
250
251 if (newLowerBound < stateConstraints.get(fromState).getLowerBound())
252 {
253 continue;
254 }
255 }
256
257
258 if (stateConstraints.containsKey(toState))
259 {
260 int newUpperBound = bounds.get(toState).getUpperBound() + 1;
261 if (newUpperBound > stateConstraints.get(toState).getUpperBound())
262 {
263 continue;
264 }
265 }
266
267 selectedMessages.add(message);
268 bounds.get(fromState).increaseLowerBound();
269 bounds.get(toState).increaseUpperBound();
270 }
271 }
272
273 return selectedMessages;
274 }
275
276
277
278
279
280
281 private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
282 IdealState idealState,
283 ClusterDataCache cache)
284 {
285 Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
286
287 List<String> statePriorityList = stateModelDefinition.getStatesPriorityList();
288 for (String state : statePriorityList)
289 {
290 String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
291 int max = -1;
292 if ("N".equals(numInstancesPerState))
293 {
294 max = cache.getLiveInstances().size();
295 }
296 else if ("R".equals(numInstancesPerState))
297 {
298
299
300 if (idealState != null)
301 {
302 max = cache.getReplicas(idealState.getResourceName());
303 }
304 }
305 else
306 {
307 try
308 {
309 max = Integer.parseInt(numInstancesPerState);
310 }
311 catch (Exception e)
312 {
313
314 }
315 }
316
317 if (max > -1)
318 {
319
320 stateConstraints.put(state, new Bounds(0, max));
321 }
322 }
323
324 return stateConstraints;
325 }
326
327
328
329 private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef)
330 {
331 Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
332 List<String> stateTransitionPriorityList =
333 stateModelDef.getStateTransitionPriorityList();
334 for (int i = 0; i < stateTransitionPriorityList.size(); i++)
335 {
336 stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
337 }
338
339 return stateTransitionPriorities;
340 }
341 }