1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.TreeMap;
31
32 import org.apache.helix.HelixConstants;
33 import org.apache.helix.HelixDefinedState;
34 import org.apache.helix.HelixManager;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.HelixConstants.StateModelToken;
37 import org.apache.helix.controller.pipeline.AbstractBaseStage;
38 import org.apache.helix.controller.pipeline.StageException;
39 import org.apache.helix.model.CurrentState;
40 import org.apache.helix.model.IdealState;
41 import org.apache.helix.model.LiveInstance;
42 import org.apache.helix.model.Partition;
43 import org.apache.helix.model.Resource;
44 import org.apache.helix.model.StateModelDefinition;
45 import org.apache.helix.model.IdealState.IdealStateModeProperty;
46 import org.apache.log4j.Logger;
47
48
49
50
51
52
53
54
55 public class BestPossibleStateCalcStage extends AbstractBaseStage
56 {
57 private static final Logger logger =
58 Logger.getLogger(BestPossibleStateCalcStage.class.getName());
59
60 @Override
61 public void process(ClusterEvent event) throws Exception
62 {
63 long startTime = System.currentTimeMillis();
64 logger.info("START BestPossibleStateCalcStage.process()");
65
66 CurrentStateOutput currentStateOutput =
67 event.getAttribute(AttributeName.CURRENT_STATE.toString());
68 Map<String, Resource> resourceMap =
69 event.getAttribute(AttributeName.RESOURCES.toString());
70 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
71
72 if (currentStateOutput == null || resourceMap == null || cache == null)
73 {
74 throw new StageException("Missing attributes in event:" + event
75 + ". Requires CURRENT_STATE|RESOURCES|DataCache");
76 }
77
78 BestPossibleStateOutput bestPossibleStateOutput =
79 compute(event, resourceMap, currentStateOutput);
80 event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(),
81 bestPossibleStateOutput);
82
83 long endTime = System.currentTimeMillis();
84 logger.info("END BestPossibleStateCalcStage.process(). took: "
85 + (endTime - startTime) + " ms");
86 }
87
88 private BestPossibleStateOutput compute(ClusterEvent event,
89 Map<String, Resource> resourceMap,
90 CurrentStateOutput currentStateOutput)
91 {
92
93
94
95
96
97 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
98 HelixManager manager = event.getAttribute("helixmanager");
99
100 BestPossibleStateOutput output = new BestPossibleStateOutput();
101
102 for (String resourceName : resourceMap.keySet())
103 {
104 logger.debug("Processing resource:" + resourceName);
105
106 Resource resource = resourceMap.get(resourceName);
107
108
109 IdealState idealState = cache.getIdealState(resourceName);
110
111 String stateModelDefName;
112
113 if (idealState == null)
114 {
115
116 logger.info("resource:" + resourceName + " does not exist anymore");
117 stateModelDefName = currentStateOutput.getResourceStateModelDef(resourceName);
118 idealState = new IdealState(resourceName);
119 }
120 else
121 {
122 stateModelDefName = idealState.getStateModelDefRef();
123 }
124
125 StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
126 if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO_REBALANCE)
127 {
128 calculateAutoBalancedIdealState(cache,
129 idealState,
130 stateModelDef,
131 currentStateOutput);
132 }
133
134
135 for (Partition partition : resource.getPartitions())
136 {
137 Map<String, String> currentStateMap =
138 currentStateOutput.getCurrentStateMap(resourceName, partition);
139
140 Map<String, String> bestStateForPartition;
141 Set<String> disabledInstancesForPartition =
142 cache.getDisabledInstancesForPartition(partition.toString());
143
144 if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
145 {
146 Map<String, String> idealStateMap =
147 idealState.getInstanceStateMap(partition.getPartitionName());
148 bestStateForPartition =
149 computeCustomizedBestStateForPartition(cache,
150 stateModelDef,
151 idealStateMap,
152 currentStateMap,
153 disabledInstancesForPartition);
154 }
155 else
156
157 {
158 List<String> instancePreferenceList =
159 getPreferenceList(cache, partition, idealState, stateModelDef);
160
161 bestStateForPartition =
162 computeAutoBestStateForPartition(cache,
163 stateModelDef,
164 instancePreferenceList,
165 currentStateMap,
166 disabledInstancesForPartition);
167 }
168 output.setState(resourceName, partition, bestStateForPartition);
169 }
170 }
171 return output;
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186 private void calculateAutoBalancedIdealState(ClusterDataCache cache,
187 IdealState idealState,
188 StateModelDefinition stateModelDef,
189 CurrentStateOutput currentStateOutput)
190 {
191 String topStateValue = stateModelDef.getStatesPriorityList().get(0);
192 Set<String> liveInstances = cache._liveInstanceMap.keySet();
193 Set<String> taggedInstances = new HashSet<String>();
194
195
196 if(idealState.getInstanceGroupTag() != null)
197 {
198 for(String instanceName : liveInstances)
199 {
200 if(cache._instanceConfigMap.get(instanceName).containsTag(idealState.getInstanceGroupTag()))
201 {
202 taggedInstances.add(instanceName);
203 }
204 }
205 }
206 if(taggedInstances.size() > 0)
207 {
208 logger.info("found the following instances with tag " + idealState.getResourceName() + " " + taggedInstances);
209 liveInstances = taggedInstances;
210 }
211
212 int replicas = 1;
213 try
214 {
215 replicas = Integer.parseInt(idealState.getReplicas());
216 }
217 catch (Exception e)
218 {
219 logger.error("", e);
220 }
221
222 Map<String, List<String>> defaultListFields = new TreeMap<String, List<String>>();
223 List<String> emptyList = new ArrayList<String>(0);
224 for (String partition : idealState.getPartitionSet())
225 {
226 defaultListFields.put(partition, emptyList);
227 }
228 idealState.getRecord().setListFields(defaultListFields);
229
230 if (liveInstances.size() == 0)
231 {
232 logger.info("No live instances, return. Idealstate : "
233 + idealState.getResourceName());
234 return;
235 }
236 Map<String, List<String>> masterAssignmentMap = new HashMap<String, List<String>>();
237 for (String instanceName : liveInstances)
238 {
239 masterAssignmentMap.put(instanceName, new ArrayList<String>());
240 }
241 Set<String> orphanedPartitions = new HashSet<String>();
242 orphanedPartitions.addAll(idealState.getPartitionSet());
243
244 for (String liveInstanceName : liveInstances)
245 {
246 CurrentState currentState =
247 cache.getCurrentState(liveInstanceName,
248 cache.getLiveInstances()
249 .get(liveInstanceName)
250 .getSessionId()).get(idealState.getId());
251 if (currentState != null)
252 {
253 Map<String, String> partitionStates = currentState.getPartitionStateMap();
254 for (String partitionName : partitionStates.keySet())
255 {
256 String state = partitionStates.get(partitionName);
257 if (state.equals(topStateValue))
258 {
259 masterAssignmentMap.get(liveInstanceName).add(partitionName);
260 orphanedPartitions.remove(partitionName);
261 }
262 }
263 }
264 }
265 List<String> orphanedPartitionsList = new ArrayList<String>();
266 orphanedPartitionsList.addAll(orphanedPartitions);
267 int maxPartitionsPerInstance = idealState.getMaxPartitionsPerInstance();
268 normalizeAssignmentMap(masterAssignmentMap, orphanedPartitionsList, maxPartitionsPerInstance);
269 idealState.getRecord()
270 .setListFields(generateListFieldFromMasterAssignment(masterAssignmentMap,
271 replicas));
272
273 }
274
275
276
277
278
279
280
281
282
283
284
285 private void normalizeAssignmentMap(Map<String, List<String>> masterAssignmentMap,
286 List<String> orphanPartitions, int maxPartitionsPerInstance)
287 {
288 int totalPartitions = 0;
289 String[] instanceNames = new String[masterAssignmentMap.size()];
290 masterAssignmentMap.keySet().toArray(instanceNames);
291 Arrays.sort(instanceNames);
292
293 for (String key : masterAssignmentMap.keySet())
294 {
295 totalPartitions += masterAssignmentMap.get(key).size();
296 Collections.sort(masterAssignmentMap.get(key));
297 }
298 totalPartitions += orphanPartitions.size();
299
300
301 int partitionNumber = totalPartitions / masterAssignmentMap.size();
302 int leave = totalPartitions % masterAssignmentMap.size();
303
304 for (int i = 0; i < instanceNames.length; i++)
305 {
306 int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
307 leave--;
308
309 while (masterAssignmentMap.get(instanceNames[i]).size() > targetPartitionNo)
310 {
311 int lastElementIndex = masterAssignmentMap.get(instanceNames[i]).size() - 1;
312 orphanPartitions.add(masterAssignmentMap.get(instanceNames[i])
313 .get(lastElementIndex));
314 masterAssignmentMap.get(instanceNames[i]).remove(lastElementIndex);
315 }
316 }
317 leave = totalPartitions % masterAssignmentMap.size();
318 Collections.sort(orphanPartitions);
319
320 for (int i = 0; i < instanceNames.length; i++)
321 {
322 int targetPartitionNo = leave > 0 ? (partitionNumber + 1) : partitionNumber;
323 leave--;
324 if(targetPartitionNo > maxPartitionsPerInstance)
325 {
326 targetPartitionNo = maxPartitionsPerInstance;
327 }
328 while (masterAssignmentMap.get(instanceNames[i]).size() < targetPartitionNo)
329 {
330 int lastElementIndex = orphanPartitions.size() - 1;
331 masterAssignmentMap.get(instanceNames[i])
332 .add(orphanPartitions.get(lastElementIndex));
333 orphanPartitions.remove(lastElementIndex);
334 }
335 }
336 if (orphanPartitions.size() > 0)
337 {
338 logger.warn("orphanPartitions still contains elements");
339 }
340 }
341
342
343
344
345
346
347
348
349
350
351
352 Map<String, List<String>> generateListFieldFromMasterAssignment(Map<String, List<String>> masterAssignmentMap,
353 int replicas)
354 {
355 Map<String, List<String>> listFields = new HashMap<String, List<String>>();
356 int slaves = replicas - 1;
357 String[] instanceNames = new String[masterAssignmentMap.size()];
358 masterAssignmentMap.keySet().toArray(instanceNames);
359 Arrays.sort(instanceNames);
360
361 for (int i = 0; i < instanceNames.length; i++)
362 {
363 String instanceName = instanceNames[i];
364 List<String> otherInstances = new ArrayList<String>(masterAssignmentMap.size() - 1);
365 for (int x = 0; x < instanceNames.length - 1; x++)
366 {
367 int index = (x + i + 1) % instanceNames.length;
368 otherInstances.add(instanceNames[index]);
369 }
370
371 List<String> partitionList = masterAssignmentMap.get(instanceName);
372 for (int j = 0; j < partitionList.size(); j++)
373 {
374 String partitionName = partitionList.get(j);
375 listFields.put(partitionName, new ArrayList<String>());
376 listFields.get(partitionName).add(instanceName);
377
378 int slavesCanAssign = Math.min(slaves, otherInstances.size());
379 for (int k = 0; k < slavesCanAssign; k++)
380 {
381 int index = (j + k + 1) % otherInstances.size();
382 listFields.get(partitionName).add(otherInstances.get(index));
383 }
384 }
385 }
386 return listFields;
387 }
388
389
390
391
392
393
394
395
396
397
398
399
400 private Map<String, String> computeAutoBestStateForPartition(ClusterDataCache cache,
401 StateModelDefinition stateModelDef,
402 List<String> instancePreferenceList,
403 Map<String, String> currentStateMap,
404 Set<String> disabledInstancesForPartition)
405 {
406 Map<String, String> instanceStateMap = new HashMap<String, String>();
407
408
409
410 if (currentStateMap != null)
411 {
412 for (String instance : currentStateMap.keySet())
413 {
414 if ((instancePreferenceList == null || !instancePreferenceList.contains(instance))
415 && !disabledInstancesForPartition.contains(instance))
416 {
417
418 instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
419 }
420 else if ( (currentStateMap.get(instance) == null
421 || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
422 && disabledInstancesForPartition.contains(instance))
423 {
424
425 instanceStateMap.put(instance, stateModelDef.getInitialState());
426 }
427 }
428 }
429
430
431 if (instancePreferenceList == null)
432 {
433 return instanceStateMap;
434 }
435
436 List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
437 boolean assigned[] = new boolean[instancePreferenceList.size()];
438
439 Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
440
441 for (String state : statesPriorityList)
442 {
443 String num = stateModelDef.getNumInstancesPerState(state);
444 int stateCount = -1;
445 if ("N".equals(num))
446 {
447 Set<String> liveAndEnabled = new HashSet<String>(liveInstancesMap.keySet());
448 liveAndEnabled.removeAll(disabledInstancesForPartition);
449 stateCount = liveAndEnabled.size();
450 }
451 else if ("R".equals(num))
452 {
453 stateCount = instancePreferenceList.size();
454 }
455 else
456 {
457 try
458 {
459 stateCount = Integer.parseInt(num);
460 }
461 catch (Exception e)
462 {
463 logger.error("Invalid count for state:" + state + " ,count=" + num);
464 }
465 }
466 if (stateCount > -1)
467 {
468 int count = 0;
469 for (int i = 0; i < instancePreferenceList.size(); i++)
470 {
471 String instanceName = instancePreferenceList.get(i);
472
473 boolean notInErrorState = currentStateMap == null
474 || currentStateMap.get(instanceName) == null
475 || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString());
476
477 if (liveInstancesMap.containsKey(instanceName) && !assigned[i]
478 && notInErrorState && !disabledInstancesForPartition.contains(instanceName))
479 {
480 instanceStateMap.put(instanceName, state);
481 count = count + 1;
482 assigned[i] = true;
483 if (count == stateCount)
484 {
485 break;
486 }
487 }
488 }
489 }
490 }
491 return instanceStateMap;
492 }
493
494
495
496
497
498
499
500
501
502
503
504 private Map<String, String> computeCustomizedBestStateForPartition(ClusterDataCache cache,
505 StateModelDefinition stateModelDef,
506 Map<String, String> idealStateMap,
507 Map<String, String> currentStateMap,
508 Set<String> disabledInstancesForPartition)
509 {
510 Map<String, String> instanceStateMap = new HashMap<String, String>();
511
512
513
514 if (currentStateMap != null)
515 {
516 for (String instance : currentStateMap.keySet())
517 {
518 if ((idealStateMap == null || !idealStateMap.containsKey(instance))
519 && !disabledInstancesForPartition.contains(instance))
520 {
521
522 instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString());
523 }
524 else if ( (currentStateMap.get(instance) == null
525 || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()))
526 && disabledInstancesForPartition.contains(instance))
527 {
528
529 instanceStateMap.put(instance, stateModelDef.getInitialState());
530 }
531 }
532 }
533
534
535 if (idealStateMap == null)
536 {
537 return instanceStateMap;
538 }
539
540 Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
541 for (String instance : idealStateMap.keySet())
542 {
543 boolean notInErrorState = currentStateMap == null
544 || currentStateMap.get(instance) == null
545 || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString());
546
547 if (liveInstancesMap.containsKey(instance) && notInErrorState
548 && !disabledInstancesForPartition.contains(instance))
549 {
550 instanceStateMap.put(instance, idealStateMap.get(instance));
551 }
552 }
553
554 return instanceStateMap;
555 }
556
557 private List<String> getPreferenceList(ClusterDataCache cache,
558 Partition resource,
559 IdealState idealState,
560 StateModelDefinition stateModelDef)
561 {
562 List<String> listField = idealState.getPreferenceList(resource.getPartitionName());
563
564 if (listField != null && listField.size() == 1
565 && StateModelToken.ANY_LIVEINSTANCE.toString().equals(listField.get(0)))
566 {
567 Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
568 List<String> prefList = new ArrayList<String>(liveInstances.keySet());
569 Collections.sort(prefList);
570 return prefList;
571 }
572 else
573 {
574 return listField;
575 }
576 }
577 }