View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.TreeMap;
28  
29  import org.apache.helix.HelixException;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.model.IdealState.IdealStateProperty;
32  
33  
34  /**
35   * DefaultIdealStateCalculator tries to optimally allocate master/slave partitions among
36   * espresso storage nodes.
37   *
38   * Given a batch of storage nodes, the partition and replication factor, the algorithm first given a initial state
39   * When new batches of storage nodes are added, the algorithm will calculate the new ideal state such that the total
40   * partition movements are minimized.
41   *
42   */
43  public class DefaultIdealStateCalculator
44  {
45    static final String _MasterAssignmentMap = "MasterAssignmentMap";
46    static final String _SlaveAssignmentMap = "SlaveAssignmentMap";
47    static final String _partitions = "partitions";
48    static final String _replicas = "replicas";
49  
50    /**
51     * Calculate the initial ideal state given a batch of storage instances, the replication factor and
52     * number of partitions
53     *
54     * 1. Calculate the master assignment by random shuffling
55     * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
56     * 3. for each storage instance, calculate the i-th slave assignment map
57     * 4. Combine the i-th slave assignment maps together
58     *
59     * @param instanceNames
60     *          list of storage node instances
61     * @param partitions
62     *          number of partitions
63     * @param replicas
64     *          The number of replicas (slave partitions) per master partition
65     * @param masterStateValue
66     *          master state value: e.g. "MASTER" or "LEADER"
67     * @param slaveStateValue
68     *          slave state value: e.g. "SLAVE" or "STANDBY"
69     * @param resourceName
70     * @return a ZNRecord that contain the idealstate info
71     */
72    public static ZNRecord calculateIdealState(List<String> instanceNames, int partitions, int replicas, String resourceName,
73                                               String masterStateValue, String slaveStateValue)
74    {
75      Collections.sort(instanceNames);
76      if(instanceNames.size() < replicas + 1)
77      {
78        throw new HelixException("Number of instances must not be less than replicas + 1. "
79                                        + "instanceNr:" + instanceNames.size()
80                                        + ", replicas:" + replicas);
81      }
82      else if(partitions < instanceNames.size())
83      {
84        ZNRecord idealState = IdealStateCalculatorByShuffling.calculateIdealState(instanceNames, partitions, replicas, resourceName, 12345, masterStateValue, slaveStateValue);
85        int i = 0;
86        for(String partitionId : idealState.getMapFields().keySet())
87        {
88          Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
89          List<String> partitionAssignmentPriorityList = new ArrayList<String>();
90          String masterInstance = "";
91          for(String instanceName : partitionAssignmentMap.keySet())
92          {
93            if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
94                && masterInstance.equals(""))
95            {
96              masterInstance = instanceName;
97            }
98            else
99            {
100             partitionAssignmentPriorityList.add(instanceName);
101           }
102         }
103         Collections.shuffle(partitionAssignmentPriorityList, new Random(i++));
104         partitionAssignmentPriorityList.add(0, masterInstance);
105         idealState.setListField(partitionId, partitionAssignmentPriorityList);
106       }
107       return idealState;
108     }
109 
110     Map<String, Object> result = calculateInitialIdealState(instanceNames, partitions, replicas);
111 
112     return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
113   }
114 
115   public static ZNRecord calculateIdealStateBatch(List<List<String>> instanceBatches, int partitions, int replicas, String resourceName,
116                                                   String masterStateValue, String slaveStateValue)
117   {
118     Map<String, Object> result = calculateInitialIdealState(instanceBatches.get(0), partitions, replicas);
119 
120     for(int i = 1; i < instanceBatches.size(); i++)
121     {
122       result = calculateNextIdealState(instanceBatches.get(i), result);
123     }
124 
125     return convertToZNRecord(result, resourceName, masterStateValue, slaveStateValue);
126   }
127 
128   /**
129    * Convert the internal result (stored as a Map<String, Object>) into ZNRecord.
130    */
131   public static ZNRecord convertToZNRecord(Map<String, Object> result, String resourceName,
132                                     String masterStateValue, String slaveStateValue)
133   {
134     Map<String, List<Integer>> nodeMasterAssignmentMap
135     = (Map<String, List<Integer>>) (result.get(_MasterAssignmentMap));
136     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
137         = (Map<String, Map<String, List<Integer>>>)(result.get(_SlaveAssignmentMap));
138 
139     int partitions = (Integer)(result.get("partitions"));
140 
141     ZNRecord idealState = new ZNRecord(resourceName);
142     idealState.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
143 
144 
145     for(String instanceName : nodeMasterAssignmentMap.keySet())
146     {
147       for(Integer partitionId : nodeMasterAssignmentMap.get(instanceName))
148       {
149         String partitionName = resourceName+"_"+partitionId;
150         if(!idealState.getMapFields().containsKey(partitionName))
151         {
152           idealState.setMapField(partitionName, new TreeMap<String, String>());
153         }
154         idealState.getMapField(partitionName).put(instanceName, masterStateValue);
155       }
156     }
157 
158     for(String instanceName : nodeSlaveAssignmentMap.keySet())
159     {
160       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
161 
162       for(String slaveNode: slaveAssignmentMap.keySet())
163       {
164         List<Integer> slaveAssignment = slaveAssignmentMap.get(slaveNode);
165         for(Integer partitionId: slaveAssignment)
166         {
167           String partitionName = resourceName+"_"+partitionId;
168           idealState.getMapField(partitionName).put(slaveNode, slaveStateValue);
169         }
170       }
171     }
172     // generate the priority list of instances per partition. Master should be at front and slave follows.
173 
174     for(String partitionId : idealState.getMapFields().keySet())
175     {
176       Map<String, String> partitionAssignmentMap = idealState.getMapField(partitionId);
177       List<String> partitionAssignmentPriorityList = new ArrayList<String>();
178       String masterInstance = "";
179       for(String instanceName : partitionAssignmentMap.keySet())
180       {
181         if(partitionAssignmentMap.get(instanceName).equalsIgnoreCase(masterStateValue)
182             && masterInstance.equals(""))
183         {
184           masterInstance = instanceName;
185         }
186         else
187         {
188           partitionAssignmentPriorityList.add(instanceName);
189         }
190       }
191       Collections.shuffle(partitionAssignmentPriorityList);
192       partitionAssignmentPriorityList.add(0, masterInstance);
193       idealState.setListField(partitionId, partitionAssignmentPriorityList);
194     }
195     assert(result.containsKey("replicas"));
196     idealState.setSimpleField(IdealStateProperty.REPLICAS.toString(), result.get("replicas").toString());
197     return idealState;
198   }
199   /**
200    * Calculate the initial ideal state given a batch of storage instances, the replication factor and
201    * number of partitions
202    *
203    * 1. Calculate the master assignment by random shuffling
204    * 2. for each storage instance, calculate the 1st slave assignment map, by another random shuffling
205    * 3. for each storage instance, calculate the i-th slave assignment map
206    * 4. Combine the i-th slave assignment maps together
207    *
208    * @param instanceNames
209    *          list of storage node instances
210    * @param weight
211    *          weight for the initial storage node (each node has the same weight)
212    * @param partitions
213    *          number of partitions
214    * @param replicas
215    *          The number of replicas (slave partitions) per master partition
216    * @return a map that contain the idealstate info
217    */
218   public static Map<String, Object> calculateInitialIdealState(List<String> instanceNames, int partitions, int replicas)
219   {
220     Random r = new Random(54321);
221     assert(replicas <= instanceNames.size() - 1);
222 
223     ArrayList<Integer> masterPartitionAssignment = new ArrayList<Integer>();
224     for(int i = 0;i< partitions; i++)
225     {
226       masterPartitionAssignment.add(i);
227     }
228     // shuffle the partition id array
229     Collections.shuffle(masterPartitionAssignment, new Random(r.nextInt()));
230 
231     // 1. Generate the random master partition assignment
232     //    instanceName -> List of master partitions on that instance
233     Map<String, List<Integer>> nodeMasterAssignmentMap = new TreeMap<String, List<Integer>>();
234     for(int i = 0; i < masterPartitionAssignment.size(); i++)
235     {
236       String instanceName = instanceNames.get(i % instanceNames.size());
237       if(!nodeMasterAssignmentMap.containsKey(instanceName))
238       {
239         nodeMasterAssignmentMap.put(instanceName, new ArrayList<Integer>());
240       }
241       nodeMasterAssignmentMap.get(instanceName).add(masterPartitionAssignment.get(i));
242     }
243 
244     // instanceName -> slave assignment for its master partitions
245     // slave assignment: instanceName -> list of slave partitions on it
246     List<Map<String, Map<String, List<Integer>>>> nodeSlaveAssignmentMapsList = new ArrayList<Map<String, Map<String, List<Integer>>>>(replicas);
247 
248     Map<String, Map<String, List<Integer>>> firstNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
249     Map<String, Map<String, List<Integer>>> combinedNodeSlaveAssignmentMap = new TreeMap<String, Map<String, List<Integer>>>();
250 
251     if(replicas > 0)
252     {
253       // 2. For each node, calculate the evenly distributed slave as the first slave assignment
254       // We will figure out the 2nd ...replicas-th slave assignment based on the first level slave assignment
255       for(int i = 0; i < instanceNames.size(); i++)
256       {
257         List<String> slaveInstances = new ArrayList<String>();
258         ArrayList<Integer> slaveAssignment = new ArrayList<Integer>();
259         TreeMap<String, List<Integer>> slaveAssignmentMap = new TreeMap<String, List<Integer>>();
260 
261         for(int j = 0;j < instanceNames.size(); j++)
262         {
263           if(j != i)
264           {
265             slaveInstances.add(instanceNames.get(j));
266             slaveAssignmentMap.put(instanceNames.get(j), new ArrayList<Integer>());
267           }
268         }
269         // Get the number of master partitions on instanceName
270         List<Integer> masterAssignment =  nodeMasterAssignmentMap.get(instanceNames.get(i));
271         // do a random shuffling as in step 1, so that the first-level slave are distributed among rest instances
272 
273 
274         for(int j = 0;j < masterAssignment.size(); j++)
275         {
276           slaveAssignment.add(j);
277         }
278         Collections.shuffle(slaveAssignment, new Random(r.nextInt()));
279 
280         Collections.shuffle(slaveInstances, new Random(instanceNames.get(i).hashCode()));
281 
282         // Get the slave assignment map of node instanceName
283         for(int j = 0;j < masterAssignment.size(); j++)
284         {
285           String slaveInstanceName = slaveInstances.get(slaveAssignment.get(j) % slaveInstances.size());
286           if(!slaveAssignmentMap.containsKey(slaveInstanceName))
287           {
288             slaveAssignmentMap.put(slaveInstanceName, new ArrayList<Integer>());
289           }
290           slaveAssignmentMap.get(slaveInstanceName).add(masterAssignment.get(j));
291         }
292         firstNodeSlaveAssignmentMap.put(instanceNames.get(i), slaveAssignmentMap);
293       }
294       nodeSlaveAssignmentMapsList.add(firstNodeSlaveAssignmentMap);
295       // From the first slave assignment map, calculate the rest slave assignment maps
296       for(int replicaOrder = 1; replicaOrder < replicas; replicaOrder++)
297       {
298         // calculate the next slave partition assignment map
299         Map<String, Map<String, List<Integer>>> nextNodeSlaveAssignmentMap
300           = calculateNextSlaveAssignemntMap(firstNodeSlaveAssignmentMap, replicaOrder);
301         nodeSlaveAssignmentMapsList.add(nextNodeSlaveAssignmentMap);
302       }
303 
304       // Combine the calculated 1...replicas-th slave assignment map together
305 
306       for(String instanceName : nodeMasterAssignmentMap.keySet())
307       {
308         Map<String, List<Integer>> combinedSlaveAssignmentMap =  new TreeMap<String, List<Integer>>();
309 
310         for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
311         {
312           Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
313 
314           for(String slaveInstance : slaveAssignmentMap.keySet())
315           {
316             if(!combinedSlaveAssignmentMap.containsKey(slaveInstance))
317             {
318               combinedSlaveAssignmentMap.put(slaveInstance, new ArrayList<Integer>());
319             }
320             combinedSlaveAssignmentMap.get(slaveInstance).addAll(slaveAssignmentMap.get(slaveInstance));
321           }
322         }
323         migrateSlaveAssignMapToNewInstances(combinedSlaveAssignmentMap, new ArrayList<String>());
324         combinedNodeSlaveAssignmentMap.put(instanceName, combinedSlaveAssignmentMap);
325       }
326     }
327     /*
328     // Print the result master and slave assignment maps
329     System.out.println("Master assignment:");
330     for(String instanceName : nodeMasterAssignmentMap.keySet())
331     {
332       System.out.println(instanceName+":");
333       for(Integer x : nodeMasterAssignmentMap.get(instanceName))
334       {
335         System.out.print(x+" ");
336       }
337       System.out.println();
338       System.out.println("Slave assignment:");
339 
340       int slaveOrder = 1;
341       for(Map<String, Map<String, List<Integer>>> slaveNodeAssignmentMap : nodeSlaveAssignmentMapsList)
342       {
343         System.out.println("Slave assignment order :" + (slaveOrder++));
344         Map<String, List<Integer>> slaveAssignmentMap = slaveNodeAssignmentMap.get(instanceName);
345         for(String slaveName : slaveAssignmentMap.keySet())
346         {
347           System.out.print("\t" + slaveName +":\n\t" );
348           for(Integer x : slaveAssignmentMap.get(slaveName))
349           {
350             System.out.print(x + " ");
351           }
352           System.out.println("\n");
353         }
354       }
355       System.out.println("\nCombined slave assignment map");
356       Map<String, List<Integer>> slaveAssignmentMap = combinedNodeSlaveAssignmentMap.get(instanceName);
357       for(String slaveName : slaveAssignmentMap.keySet())
358       {
359         System.out.print("\t" + slaveName +":\n\t" );
360         for(Integer x : slaveAssignmentMap.get(slaveName))
361         {
362           System.out.print(x + " ");
363         }
364         System.out.println("\n");
365       }
366     }*/
367     Map<String, Object> result = new TreeMap<String, Object>();
368     result.put("MasterAssignmentMap", nodeMasterAssignmentMap);
369     result.put("SlaveAssignmentMap", combinedNodeSlaveAssignmentMap);
370     result.put("replicas", new Integer(replicas));
371     result.put("partitions", new Integer(partitions));
372     return result;
373   }
374   /**
375    * In the case there are more than 1 slave, we use the following algorithm to calculate the n-th slave
376    * assignment map based on the first level slave assignment map.
377    *
378    * @param firstInstanceSlaveAssignmentMap  the first slave assignment map for all instances
379    * @param order of the slave
380    * @return the n-th slave assignment map for all the instances
381    * */
382   static Map<String, Map<String, List<Integer>>> calculateNextSlaveAssignemntMap(Map<String, Map<String, List<Integer>>> firstInstanceSlaveAssignmentMap, int replicaOrder)
383   {
384     Map<String, Map<String, List<Integer>>> result = new TreeMap<String, Map<String, List<Integer>>>();
385 
386     for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
387     {
388       Map<String, List<Integer>> resultAssignmentMap = new TreeMap<String, List<Integer>>();
389       result.put(currentInstance, resultAssignmentMap);
390     }
391 
392     for(String currentInstance : firstInstanceSlaveAssignmentMap.keySet())
393     {
394       Map<String, List<Integer>> previousSlaveAssignmentMap = firstInstanceSlaveAssignmentMap.get(currentInstance);
395       Map<String, List<Integer>> resultAssignmentMap = result.get(currentInstance);
396       int offset = replicaOrder - 1;
397       for(String instance : previousSlaveAssignmentMap.keySet())
398       {
399         List<String> otherInstances = new ArrayList<String>(previousSlaveAssignmentMap.size() - 1);
400         // Obtain an array of other instances
401         for(String otherInstance : previousSlaveAssignmentMap.keySet())
402         {
403           otherInstances.add(otherInstance);
404         }
405         Collections.sort(otherInstances);
406         int instanceIndex = -1;
407         for(int index = 0;index < otherInstances.size(); index++)
408         {
409           if(otherInstances.get(index).equalsIgnoreCase(instance))
410           {
411             instanceIndex = index;
412           }
413         }
414         assert(instanceIndex >= 0);
415         if(instanceIndex == otherInstances.size() - 1)
416         {
417           instanceIndex --;
418         }
419         // Since we need to evenly distribute the slaves on "instance" to other partitions, we
420         // need to remove "instance" from the array.
421         otherInstances.remove(instance);
422 
423         // distribute previous slave assignment to other instances.
424         List<Integer> previousAssignmentList = previousSlaveAssignmentMap.get(instance);
425         for(int i = 0; i < previousAssignmentList.size(); i++)
426         {
427 
428           // Evenly distribute the previousAssignmentList to the remaining other instances
429           int newInstanceIndex = (i + offset + instanceIndex) % otherInstances.size();
430           String newInstance = otherInstances.get(newInstanceIndex);
431           if(!resultAssignmentMap.containsKey(newInstance))
432           {
433             resultAssignmentMap.put(newInstance, new ArrayList<Integer>());
434           }
435           resultAssignmentMap.get(newInstance).add(previousAssignmentList.get(i));
436         }
437       }
438     }
439     return result;
440   }
441 
442   /**
443    * Given the current idealState, and the list of new Instances needed to be added, calculate the
444    * new Ideal state.
445    *
446    * 1. Calculate how many master partitions should be moved to the new cluster of instances
447    * 2. assign the number of master partitions px to be moved to each previous node
448    * 3. for each previous node,
449    *    3.1 randomly choose px nodes, move them to temp list
450    *    3.2 for each px nodes, remove them from the slave assignment map; record the map position of
451    *        the partition;
452    *    3.3 calculate # of new nodes that should be put in the slave assignment map
453    *    3.4 even-fy the slave assignment map;
454    *    3.5 randomly place # of new nodes that should be placed in
455    *
456    * 4. from all the temp master node list get from 3.1,
457    *    4.1 randomly assign them to nodes in the new cluster
458    *
459    * 5. for each node in the new cluster,
460    *    5.1 assemble the slave assignment map
461    *    5.2 even-fy the slave assignment map
462    *
463    * @param newInstances
464    *          list of new added storage node instances
465    * @param weight
466    *          weight for the new storage nodes (each node has the same weight)
467    * @param previousIdealState
468    *          The previous ideal state
469    * @return a map that contain the updated idealstate info
470    * */
471   public static Map<String, Object> calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState)
472   {
473     // Obtain the master / slave assignment info maps
474     Collections.sort(newInstances);
475     Map<String, List<Integer>> previousMasterAssignmentMap
476         = (Map<String, List<Integer>>) (previousIdealState.get("MasterAssignmentMap"));
477     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap
478         = (Map<String, Map<String, List<Integer>>>)(previousIdealState.get("SlaveAssignmentMap"));
479 
480     List<String> oldInstances = new ArrayList<String>();
481     for(String oldInstance : previousMasterAssignmentMap.keySet())
482     {
483       oldInstances.add(oldInstance);
484     }
485 
486     int previousInstanceNum = previousMasterAssignmentMap.size();
487     int partitions = (Integer)(previousIdealState.get("partitions"));
488 
489     // TODO: take weight into account when calculate this
490 
491     int totalMasterParitionsToMove
492         = partitions * (newInstances.size()) / (previousInstanceNum + newInstances.size());
493     int numMastersFromEachNode = totalMasterParitionsToMove / previousInstanceNum;
494     int remain = totalMasterParitionsToMove % previousInstanceNum;
495 
496     // Note that when remain > 0, we should make [remain] moves with (numMastersFromEachNode + 1) partitions.
497     // And we should first choose those (numMastersFromEachNode + 1) moves from the instances that has more
498     // master partitions
499     List<Integer> masterPartitionListToMove = new ArrayList<Integer>();
500 
501     // For corresponding moved slave partitions, keep track of their original location; the new node does not
502     // need to migrate all of them.
503     Map<String, List<Integer>> slavePartitionsToMoveMap = new TreeMap<String, List<Integer>>();
504 
505     // Make sure that the instances that holds more master partitions are put in front
506     List<String> bigList = new ArrayList<String>(), smallList = new ArrayList<String>();
507     for(String oldInstance : previousMasterAssignmentMap.keySet())
508     {
509       List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
510       if(masterAssignmentList.size() > numMastersFromEachNode)
511       {
512         bigList.add(oldInstance);
513       }
514       else
515       {
516         smallList.add(oldInstance);
517       }
518     }
519     // "sort" the list, such that the nodes that has more master partitions moves more partitions to the
520     // new added batch of instances.
521     bigList.addAll(smallList);
522     int totalSlaveMoves = 0;
523     for(String oldInstance : bigList)
524     {
525       List<Integer> masterAssignmentList = previousMasterAssignmentMap.get(oldInstance);
526       int numToChoose = numMastersFromEachNode;
527       if(remain > 0)
528       {
529         numToChoose = numMastersFromEachNode + 1;
530         remain --;
531       }
532       // randomly remove numToChoose of master partitions to the new added nodes
533       ArrayList<Integer> masterPartionsMoved = new ArrayList<Integer>();
534       randomSelect(masterAssignmentList, masterPartionsMoved, numToChoose);
535 
536       masterPartitionListToMove.addAll(masterPartionsMoved);
537       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(oldInstance);
538       removeFromSlaveAssignmentMap(slaveAssignmentMap, masterPartionsMoved, slavePartitionsToMoveMap);
539 
540       // Make sure that for old instances, the slave placement map is evenly distributed
541       // Trace the "local slave moves", which should together contribute to most of the slave migrations
542       int movesWithinInstance = migrateSlaveAssignMapToNewInstances(slaveAssignmentMap, newInstances);
543       // System.out.println("local moves: "+ movesWithinInstance);
544       totalSlaveMoves += movesWithinInstance;
545     }
546     // System.out.println("local slave moves total: "+ totalSlaveMoves);
547     // calculate the master /slave assignment for the new added nodes
548 
549     // We already have the list of master partitions that will migrate to new batch of instances,
550     // shuffle the partitions and assign them to new instances
551     Collections.shuffle(masterPartitionListToMove, new Random(12345));
552     for(int i = 0;i < newInstances.size(); i++)
553     {
554       String newInstance = newInstances.get(i);
555       List<Integer> masterPartitionList = new ArrayList<Integer>();
556       for(int j = 0;j < masterPartitionListToMove.size(); j++)
557       {
558         if(j % newInstances.size() == i)
559         {
560           masterPartitionList.add(masterPartitionListToMove.get(j));
561         }
562       }
563 
564       Map<String, List<Integer>> slavePartitionMap = new TreeMap<String, List<Integer>>();
565       for(String oldInstance : oldInstances)
566       {
567         slavePartitionMap.put(oldInstance, new ArrayList<Integer>());
568       }
569       // Build the slave assignment map for the new instance, based on the saved information
570       // about those slave partition locations in slavePartitionsToMoveMap
571       for(Integer x : masterPartitionList)
572       {
573         for(String oldInstance : slavePartitionsToMoveMap.keySet())
574         {
575           List<Integer> slaves = slavePartitionsToMoveMap.get(oldInstance);
576           if(slaves.contains(x))
577           {
578             slavePartitionMap.get(oldInstance).add(x);
579           }
580         }
581       }
582       // add entry for other new instances into the slavePartitionMap
583       List<String> otherNewInstances = new ArrayList<String>();
584       for(String instance : newInstances)
585       {
586         if(!instance.equalsIgnoreCase(newInstance))
587         {
588           otherNewInstances.add(instance);
589         }
590       }
591       // Make sure that slave partitions are evenly distributed
592       migrateSlaveAssignMapToNewInstances(slavePartitionMap, otherNewInstances);
593 
594       // Update the result in the result map. We can reuse the input previousIdealState map as
595       // the result.
596       previousMasterAssignmentMap.put(newInstance, masterPartitionList);
597       nodeSlaveAssignmentMap.put(newInstance, slavePartitionMap);
598 
599     }
600     /*
601     // Print content of the master/ slave assignment maps
602     for(String instanceName : previousMasterAssignmentMap.keySet())
603     {
604       System.out.println(instanceName+":");
605       for(Integer x : previousMasterAssignmentMap.get(instanceName))
606       {
607         System.out.print(x+" ");
608       }
609       System.out.println("\nmaster partition moved:");
610 
611       System.out.println();
612       System.out.println("Slave assignment:");
613 
614       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
615       for(String slaveName : slaveAssignmentMap.keySet())
616       {
617         System.out.print("\t" + slaveName +":\n\t" );
618         for(Integer x : slaveAssignmentMap.get(slaveName))
619         {
620           System.out.print(x + " ");
621         }
622         System.out.println("\n");
623       }
624     }
625 
626     System.out.println("Master partitions migrated to new instances");
627     for(Integer x : masterPartitionListToMove)
628     {
629         System.out.print(x+" ");
630     }
631     System.out.println();
632 
633     System.out.println("Slave partitions migrated to new instances");
634     for(String oldInstance : slavePartitionsToMoveMap.keySet())
635     {
636         System.out.print(oldInstance + ": ");
637         for(Integer x : slavePartitionsToMoveMap.get(oldInstance))
638         {
639           System.out.print(x+" ");
640         }
641         System.out.println();
642     }
643     */
644     return previousIdealState;
645   }
646   
647   public ZNRecord calculateNextIdealState(List<String> newInstances, Map<String, Object> previousIdealState,
648        String resourceName, String masterStateValue, String slaveStateValue)
649   {
650     return convertToZNRecord(calculateNextIdealState(newInstances, previousIdealState), resourceName, masterStateValue, slaveStateValue);
651   }
652   /**
653    * Given the list of master partition that will be migrated away from the storage instance,
654    * Remove their entries from the local instance slave assignment map.
655    *
656    * @param slaveAssignmentMap  the local instance slave assignment map
657    * @param masterPartionsMoved the list of master partition ids that will be migrated away
658    * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
659    *        used by new added storage nodes.
660    * */
661   static void removeFromSlaveAssignmentMap( Map<String, List<Integer>>slaveAssignmentMap, List<Integer> masterPartionsMoved, Map<String, List<Integer>> removedAssignmentMap)
662   {
663     for(String instanceName : slaveAssignmentMap.keySet())
664     {
665       List<Integer> slaveAssignment = slaveAssignmentMap.get(instanceName);
666       for(Integer partitionId: masterPartionsMoved)
667       {
668         if(slaveAssignment.contains(partitionId))
669         {
670           slaveAssignment.remove(partitionId);
671           if(!removedAssignmentMap.containsKey(instanceName))
672           {
673             removedAssignmentMap.put(instanceName, new ArrayList<Integer>());
674           }
675           removedAssignmentMap.get(instanceName).add(partitionId);
676         }
677       }
678     }
679   }
680 
681   /**
682    * Since some new storage instances are added, each existing storage instance should migrate some
683    * slave partitions to the new added instances.
684    *
685    * The algorithm keeps moving one partition to from the instance that hosts most slave partitions
686    * to the instance that hosts least number of partitions, until max-min <= 1.
687    *
688    * In this way we can guarantee that all instances hosts almost same number of slave partitions, also
689    * slave partitions are evenly distributed.
690    *
691    * @param slaveAssignmentMap  the local instance slave assignment map
692    * @param masterPartionsMoved the list of master partition ids that will be migrated away
693    * @param removedAssignmentMap keep track of the removed slave assignment info. The info can be
694    *        used by new added storage nodes.
695    * */
696   static int migrateSlaveAssignMapToNewInstances(Map<String, List<Integer>> nodeSlaveAssignmentMap, List<String> newInstances)
697   {
698     int moves = 0;
699     boolean done = false;
700     for(String newInstance : newInstances)
701     {
702       nodeSlaveAssignmentMap.put(newInstance, new ArrayList<Integer>());
703     }
704     while(!done)
705     {
706       List<Integer> maxAssignment = null, minAssignment = null;
707       int minCount = Integer.MAX_VALUE, maxCount = Integer.MIN_VALUE;
708       String minInstance = "";
709       for(String instanceName : nodeSlaveAssignmentMap.keySet())
710       {
711         List<Integer> slaveAssignment = nodeSlaveAssignmentMap.get(instanceName);
712         if(minCount > slaveAssignment.size())
713         {
714           minCount = slaveAssignment.size();
715           minAssignment = slaveAssignment;
716           minInstance = instanceName;
717         }
718         if(maxCount < slaveAssignment.size())
719         {
720           maxCount = slaveAssignment.size();
721           maxAssignment = slaveAssignment;
722         }
723       }
724       if(maxCount - minCount <= 1 )
725       {
726         done = true;
727       }
728       else
729       {
730         int indexToMove = -1;
731         // find a partition that is not contained in the minAssignment list
732         for(int i = 0; i < maxAssignment.size(); i++ )
733         {
734           if(!minAssignment.contains(maxAssignment.get(i)))
735           {
736             indexToMove = i;
737             break;
738           }
739         }
740 
741         minAssignment.add(maxAssignment.get(indexToMove));
742         maxAssignment.remove(indexToMove);
743 
744         if(newInstances.contains(minInstance))
745         {
746           moves++;
747         }
748       }
749     }
750     return moves;
751   }
752 
753   /**
754    * Randomly select a number of elements from original list and put them in the selectedList
755    * The algorithm is used to select master partitions to be migrated when new instances are added.
756    *
757    *
758    * @param originalList  the original list
759    * @param selectedList  the list that contain selected elements
760    * @param num number of elements to be selected
761    * */
762   static void randomSelect(List<Integer> originalList, List<Integer> selectedList, int num)
763   {
764     assert(originalList.size() >= num);
765     int[] indexArray = new int[originalList.size()];
766     for(int i = 0;i < indexArray.length; i++)
767     {
768       indexArray[i] = i;
769     }
770     int numRemains = originalList.size();
771     Random r = new Random(numRemains);
772     for(int j = 0;j < num; j++)
773     {
774       int randIndex = r.nextInt(numRemains);
775       selectedList.add(originalList.get(randIndex));
776       originalList.remove(randIndex);
777       numRemains --;
778     }
779   }
780 
781   public static void main(String args[])
782   {
783     List<String> instanceNames = new ArrayList<String>();
784     for(int i = 0;i < 10; i++)
785     {
786       instanceNames.add("localhost:123" + i);
787     }
788     int partitions = 48*3, replicas = 3;
789     Map<String, Object> resultOriginal = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
790 
791   }
792 }