1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.TreeMap;
28
29 import org.apache.helix.HelixException;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.model.IdealState.IdealStateProperty;
32
33
34
35
36
37
38
39
40
41
42
43 public class DefaultIdealStateCalculator
44 {
45 static final String _MasterAssignmentMap = "MasterAssignmentMap";
46 static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
47 static final String _partitions = "partitions";
48 static final String _replicas = "replicas";
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
73 String masterStateValue, String slaveStateValue)
74 {
75 Collections.sort(instanceNames);
76 if(instanceNames.size() < replicas + 1)
77 {
78 throw new HelixException("Number of instances must not be less than replicas + 1. "
79 + "instanceNr:" + instanceNames.size()
80 + ", replicas:" + replicas);
81 }
82 else if(partitions < instanceNames.size())
83 {
84 ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
85 int i = 0;
86 for(String partitionId : idealState.getMapFields().keySet())
87 {
88 Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
89 List<String> partitionAssignmentPriorityList = new ArrayList<String>();
90 String masterInstance = "";
91 for(String instanceName : partitionAssignmentMap.keySet())
92 {
93 if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
94 && masterInstance.equals(""))
95 {
96 masterInstance = instanceName;
97 }
98 else
99 {
100 partitionAssignmentPriorityList.add(instanceName);
101 }
102 }
103 Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
104 partitionAssignmentPriorityList.add(0, masterInstance);
105 idealState.setListField(partitionId, partitionAssignmentPriorityList);
106 }
107 return idealState;
108 }
109
110 Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
111
112 return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
113 }
114
115 public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
116 String masterStateValue, String slaveStateValue)
117 {
118 Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
119
120 for(int i = 1; i < instanceBatches.size(); i++)
121 {
122 result = calculateNextIdealState(instanceBatches.get(i), result);
123 }
124
125 return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
126 }
127
128
129
130
131 public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
132 String masterStateValue, String slaveStateValue)
133 {
134 Map<String, List<Integer>> nodeMasterAssignmentMap
135 = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
136 Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
137 = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
138
139 int partitions = (Integer)(result.get("partitions"));
140
141 ZNRecord idealState = new ZNRecord(resourceName);
142 idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
143
144
145 for(String instanceName : nodeMasterAssignmentMap.keySet())
146 {
147 for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
148 {
149 String partitionName = resourceName+"_"+partitionId;
150 if(!idealState.getMapFields().containsKey(partitionName))
151 {
152 idealState.setMapField(partitionName, new TreeMap<String, String>());
153 }
154 idealState.getMapField(partitionName).put(instanceName, masterStateValue);
155 }
156 }
157
158 for(String instanceName : nodeSlaveAssignmentMap.keySet())
159 {
160 Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
161
162 for(String slaveNode: slaveAssignmentMap.keySet())
163 {
164 List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
165 for(Integer partitionId: slaveAssignment)
166 {
167 String partitionName = resourceName+"_"+partitionId;
168 idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
169 }
170 }
171 }
172
173
174 for(String partitionId : idealState.getMapFields().keySet())
175 {
176 Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
177 List<String> partitionAssignmentPriorityList = new ArrayList<String>();
178 String masterInstance = "";
179 for(String instanceName : partitionAssignmentMap.keySet())
180 {
181 if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
182 && masterInstance.equals(""))
183 {
184 masterInstance = instanceName;
185 }
186 else
187 {
188 partitionAssignmentPriorityList.add(instanceName);
189 }
190 }
191 Collections.shuffle(partitionAssignmentPriorityList);
192 partitionAssignmentPriorityList.add(0, masterInstance);
193 idealState.setListField(partitionId, partitionAssignmentPriorityList);
194 }
195 assert(result.containsKey("replicas"));
196 idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
197 return idealState;
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
219 {
220 Random r = new Random(54321);
221 assert(replicas <= instanceNames.size() - 1);
222
223 ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
224 for(int i = 0;i< partitions; i++)
225 {
226 masterPartitionAssignment.add(i);
227 }
228
229 Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
230
231
232
233 Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
234 for(int i = 0; i < masterPartitionAssignment.size(); i++)
235 {
236 String instanceName = instanceNames.get(i % instanceNames.size());
237 if(!nodeMasterAssignmentMap.containsKey(instanceName))
238 {
239 nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
240 }
241 nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
242 }
243
244
245
246 List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
247
248 Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
249 Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
250
251 if(replicas > 0)
252 {
253
254
255 for(int i = 0; i < instanceNames.size(); i++)
256 {
257 List<String> slaveInstances = new ArrayList<String>();
258 ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
259 TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
260
261 for(int j = 0;j < instanceNames.size(); j++)
262 {
263 if(j != i)
264 {
265 slaveInstances.add(instanceNames.get(j));
266 slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
267 }
268 }
269
270 List<Integer> masterAssignment = nodeMasterAssignmentMap.get(instanceNames.get(i));
271
272
273
274 for(int j = 0;j < masterAssignment.size(); j++)
275 {
276 slaveAssignment.add(j);
277 }
278 Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
279
280 Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
281
282
283 for(int j = 0;j < masterAssignment.size(); j++)
284 {
285 String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
286 if(!slaveAssignmentMap.containsKey(slaveInstanceName))
287 {
288 slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
289 }
290 slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
291 }
292 firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
293 }
294 nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
295
296 for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
297 {
298
299 Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
300 = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
301 nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
302 }
303
304
305
306 for(String instanceName : nodeMasterAssignmentMap.keySet())
307 {
308 Map<String, List<Integer>> combinedSlaveAssignmentMap = new TreeMap<String, List<Integer>>();
309
310 for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
311 {
312 Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
313
314 for(String slaveInstance : slaveAssignmentMap.keySet())
315 {
316 if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
317 {
318 combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
319 }
320 combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
321 }
322 }
323 migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
324 combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
325 }
326 }
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367 Map<String, Object> result = new TreeMap<String, Object>();
368 result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
369 result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
370 result.put("replicas", new Integer(replicas));
371 result.put("partitions", new Integer(partitions));
372 return result;
373 }
374
375
376
377
378
379
380
381
382 static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
383 {
384 Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
385
386 for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
387 {
388 Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
389 result.put(currentInstance, resultAssignmentMap);
390 }
391
392 for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
393 {
394 Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
395 Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
396 int offset = replicaOrder - 1;
397 for(String instance : previousSlaveAssignmentMap.keySet())
398 {
399 List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
400
401 for(String otherInstance : previousSlaveAssignmentMap.keySet())
402 {
403 otherInstances.add(otherInstance);
404 }
405 Collections.sort(otherInstances);
406 int instanceIndex = -1;
407 for(int index = 0;index < otherInstances.size(); index++)
408 {
409 if(otherInstances.get(index).equalsIgnoreCase(instance))
410 {
411 instanceIndex = index;
412 }
413 }
414 assert(instanceIndex >= 0);
415 if(instanceIndex == otherInstances.size() - 1)
416 {
417 instanceIndex --;
418 }
419
420
421 otherInstances.remove(instance);
422
423
424 List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
425 for(int i = 0; i < previousAssignmentList.size(); i++)
426 {
427
428
429 int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
430 String newInstance = otherInstances.get(newInstanceIndex);
431 if(!resultAssignmentMap.containsKey(newInstance))
432 {
433 resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
434 }
435 resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
436 }
437 }
438 }
439 return result;
440 }
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471 public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
472 {
473
474 Collections.sort(newInstances);
475 Map<String, List<Integer>> previousMasterAssignmentMap
476 = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
477 Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
478 = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
479
480 List<String> oldInstances = new ArrayList<String>();
481 for(String oldInstance : previousMasterAssignmentMap.keySet())
482 {
483 oldInstances.add(oldInstance);
484 }
485
486 int previousInstanceNum = previousMasterAssignmentMap.size();
487 int partitions = (Integer)(previousIdealState.get("partitions"));
488
489
490
491 int totalMasterParitionsToMove
492 = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
493 int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
494 int remain = totalMasterParitionsToMove % previousInstanceNum;
495
496
497
498
499 List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
500
501
502
503 Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
504
505
506 List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
507 for(String oldInstance : previousMasterAssignmentMap.keySet())
508 {
509 List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
510 if(masterAssignmentList.size() > numMastersFromEachNode)
511 {
512 bigList.add(oldInstance);
513 }
514 else
515 {
516 smallList.add(oldInstance);
517 }
518 }
519
520
521 bigList.addAll(smallList);
522 int totalSlaveMoves = 0;
523 for(String oldInstance : bigList)
524 {
525 List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
526 int numToChoose = numMastersFromEachNode;
527 if(remain > 0)
528 {
529 numToChoose = numMastersFromEachNode + 1;
530 remain --;
531 }
532
533 ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
534 randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
535
536 masterPartitionListToMove.addAll(masterPartionsMoved);
537 Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
538 removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
539
540
541
542 int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
543
544 totalSlaveMoves += movesWithinInstance;
545 }
546
547
548
549
550
551 Collections.shuffle(masterPartitionListToMove, new Random(12345));
552 for(int i = 0;i < newInstances.size(); i++)
553 {
554 String newInstance = newInstances.get(i);
555 List<Integer> masterPartitionList = new ArrayList<Integer>();
556 for(int j = 0;j < masterPartitionListToMove.size(); j++)
557 {
558 if(j % newInstances.size() == i)
559 {
560 masterPartitionList.add(masterPartitionListToMove.get(j));
561 }
562 }
563
564 Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
565 for(String oldInstance : oldInstances)
566 {
567 slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
568 }
569
570
571 for(Integer x : masterPartitionList)
572 {
573 for(String oldInstance : slavePartitionsToMoveMap.keySet())
574 {
575 List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
576 if(slaves.contains(x))
577 {
578 slavePartitionMap.get(oldInstance).add(x);
579 }
580 }
581 }
582
583 List<String> otherNewInstances = new ArrayList<String>();
584 for(String instance : newInstances)
585 {
586 if(!instance.equalsIgnoreCase(newInstance))
587 {
588 otherNewInstances.add(instance);
589 }
590 }
591
592 migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
593
594
595
596 previousMasterAssignmentMap.put(newInstance, masterPartitionList);
597 nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
598
599 }
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644 return previousIdealState;
645 }
646
647 public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
648 String resourceName, String masterStateValue, String slaveStateValue)
649 {
650 return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
651 }
652
653
654
655
656
657
658
659
660
661 static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
662 {
663 for(String instanceName : slaveAssignmentMap.keySet())
664 {
665 List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
666 for(Integer partitionId: masterPartionsMoved)
667 {
668 if(slaveAssignment.contains(partitionId))
669 {
670 slaveAssignment.remove(partitionId);
671 if(!removedAssignmentMap.containsKey(instanceName))
672 {
673 removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
674 }
675 removedAssignmentMap.get(instanceName).add(partitionId);
676 }
677 }
678 }
679 }
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696 static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
697 {
698 int moves = 0;
699 boolean done = false;
700 for(String newInstance : newInstances)
701 {
702 nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
703 }
704 while(!done)
705 {
706 List<Integer> maxAssignment = null, minAssignment = null;
707 int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
708 String minInstance = "";
709 for(String instanceName : nodeSlaveAssignmentMap.keySet())
710 {
711 List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
712 if(minCount > slaveAssignment.size())
713 {
714 minCount = slaveAssignment.size();
715 minAssignment = slaveAssignment;
716 minInstance = instanceName;
717 }
718 if(maxCount < slaveAssignment.size())
719 {
720 maxCount = slaveAssignment.size();
721 maxAssignment = slaveAssignment;
722 }
723 }
724 if(maxCount - minCount <= 1 )
725 {
726 done = true;
727 }
728 else
729 {
730 int indexToMove = -1;
731
732 for(int i = 0; i < maxAssignment.size(); i++ )
733 {
734 if(!minAssignment.contains(maxAssignment.get(i)))
735 {
736 indexToMove = i;
737 break;
738 }
739 }
740
741 minAssignment.add(maxAssignment.get(indexToMove));
742 maxAssignment.remove(indexToMove);
743
744 if(newInstances.contains(minInstance))
745 {
746 moves++;
747 }
748 }
749 }
750 return moves;
751 }
752
753
754
755
756
757
758
759
760
761
762 static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
763 {
764 assert(originalList.size() >= num);
765 int[] indexArray = new int[originalList.size()];
766 for(int i = 0;i < indexArray.length; i++)
767 {
768 indexArray[i] = i;
769 }
770 int numRemains = originalList.size();
771 Random r = new Random(numRemains);
772 for(int j = 0;j < num; j++)
773 {
774 int randIndex = r.nextInt(numRemains);
775 selectedList.add(originalList.get(randIndex));
776 originalList.remove(randIndex);
777 numRemains --;
778 }
779 }
780
781 public static void main(String args[])
782 {
783 List<String> instanceNames = new ArrayList<String>();
784 for(int i = 0;i < 10; i++)
785 {
786 instanceNames.add("localhost:123" + i);
787 }
788 int partitions = 48*3, replicas = 3;
789 Map<String, Object> resultOriginal = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
790
791 }
792 }