View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.TreeMap;
28  
29  import org.apache.helix.ZNRecord;
30  import org.apache.helix.model.IdealState.IdealStateProperty;
31  
32  
33  public class IdealStateCalculatorByRush
34  {
35    /**
36     * Build the config map for RUSH algorithm. The input of RUSH algorithm groups
37     * nodes into "cluster"s, and different clusters can be assigned with
38     * different weights.
39     *
40     * @param numClusters
41     *          number of node clusters
42     * @param instancesPerCluster
43     *          List of clusters, each contain a list of node name strings.
44     * @param replicationDegree
45     *          the replication degree
46     * @param clusterWeights
47     *          the weight for each node cluster
48     * @return this config map structure for RUSH algorithm.
49     */
50    static HashMap<String, Object> buildRushConfig(int numClusters,
51        List<List<String>> instancesPerCluster, int replicationDegree,
52        List<Integer> clusterWeights)
53    {
54      HashMap<String, Object> config = new HashMap<String, Object>();
55      config.put("replicationDegree", replicationDegree);
56  
57      HashMap[] clusterList = new HashMap[numClusters];
58      config.put("subClusters", clusterList);
59  
60      HashMap[] nodes;
61      HashMap<String, String> node;
62      HashMap<String, Object> clusterData;
63      for (int n = 0; n < numClusters; n++)
64      {
65        int numNodes = instancesPerCluster.get(n).size();
66        nodes = new HashMap[numNodes];
67        for (int i = 0; i < numNodes; i++)
68        {
69          node = new HashMap<String, String>();
70          node.put("partition", instancesPerCluster.get(n).get(i));
71          nodes[i] = node;
72        }
73        clusterData = new HashMap<String, Object>();
74        clusterData.put("weight", clusterWeights.get(n));
75        clusterData.put("nodes", nodes);
76        clusterList[n] = clusterData;
77      }
78      return config;
79    }
80  
81    /**
82     * Calculate the ideal state for list of instances clusters.
83     *
84     * @param numClusters
85     *          number of node clusters
86     * @param instanceClusters
87     *          List of clusters, each contain a list of node name strings.
88     * @param instanceClusterWeights
89     *          the weight for each instance cluster
90     * @param partitions
91     *          the partition number of the database
92     * @param replicas
93     *          the replication degree
94     * @param resourceName
95     *          the name of the database
96     * @return The ZNRecord that contains the ideal state
97     */
98    public static ZNRecord calculateIdealState(
99        List<List<String>> instanceClusters,
100       List<Integer> instanceClusterWeights, int partitions, int replicas,
101       String resourceName) throws Exception
102   {
103     ZNRecord result = new ZNRecord(resourceName);
104 
105     int numberOfClusters = instanceClusters.size();
106     List<List<String>> nodesInClusters = instanceClusters;
107     List<Integer> clusterWeights = instanceClusterWeights;
108 
109     HashMap<String, Object> rushConfig = buildRushConfig(numberOfClusters,
110         nodesInClusters, replicas + 1, clusterWeights);
111     RUSHrHash rushHash = new RUSHrHash(rushConfig);
112 
113     Random r = new Random(0);
114     for (int i = 0; i < partitions; i++)
115     {
116       int partitionId = i;
117       String partitionName = resourceName + ".partition-" + partitionId;
118 
119       ArrayList<HashMap> partitionAssignmentResult = rushHash
120           .findNode(i);
121       List<String> nodeNames = new ArrayList<String>();
122       for (HashMap<?, ?> p : partitionAssignmentResult)
123       {
124         for (Object key : p.keySet())
125         {
126           if (p.get(key) instanceof String)
127           {
128             nodeNames.add(p.get(key).toString());
129           }
130         }
131       }
132       Map<String, String> partitionAssignment = new TreeMap<String, String>();
133 
134       for (int j = 0; j < nodeNames.size(); j++)
135       {
136         partitionAssignment.put(nodeNames.get(j), "SLAVE");
137       }
138       int master = r.nextInt(nodeNames.size());
139       //master = nodeNames.size()/2;
140       partitionAssignment.put(nodeNames.get(master), "MASTER");
141 
142 
143       result.setMapField(partitionName, partitionAssignment);
144     }
145     result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
146     return result;
147   }
148 
149   public static ZNRecord calculateIdealState(
150       List<String> instanceClusters,
151       int instanceClusterWeight, int partitions, int replicas,
152       String resourceName) throws Exception
153   {
154     List<List<String>> instanceClustersList = new ArrayList<List<String>>();
155     instanceClustersList.add(instanceClusters);
156 
157     List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
158     instanceClusterWeightList.add(instanceClusterWeight);
159 
160     return calculateIdealState(
161         instanceClustersList,
162         instanceClusterWeightList, partitions, replicas,
163         resourceName);
164   }
165   /**
166    * Helper function to see how many partitions are mapped to different
167    * instances in two ideal states
168    * */
169   public static void printDiff(ZNRecord record1, ZNRecord record2)
170   {
171     int diffCount = 0;
172     int diffCountMaster = 0;
173     for (String key : record1.getMapFields().keySet())
174     {
175       Map<String, String> map1 = record1.getMapField(key);
176       Map<String, String> map2 = record2.getMapField(key);
177 
178       for (String k : map1.keySet())
179       {
180         if (!map2.containsKey(k))
181         {
182           diffCount++;
183         }
184         else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
185         {
186           diffCountMaster++;
187         }
188       }
189     }
190     System.out.println("\ndiff count = " + diffCount);
191     System.out.println("\nmaster diff count:"+ diffCountMaster);
192   }
193 
194   /**
195    * Helper function to calculate and print the standard deviation of the
196    * partition assignment ideal state.
197    * */
198   public static void printIdealStateStats(ZNRecord record)
199   {
200     Map<String, Integer> countsMap = new TreeMap<String, Integer>();
201     Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
202     for (String key : record.getMapFields().keySet())
203     {
204       Map<String, String> map1 = record.getMapField(key);
205       for (String k : map1.keySet())
206       {
207         if (!countsMap.containsKey(k))
208         {
209           countsMap.put(k, new Integer(0));
210         }
211         else
212         {
213           countsMap.put(k, countsMap.get(k).intValue() + 1);
214         }
215         if (!masterCountsMap.containsKey(k))
216         {
217           masterCountsMap.put(k, new Integer(0));
218 
219         }
220         else if (map1.get(k).equalsIgnoreCase("MASTER"))
221         {
222           masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
223         }
224       }
225     }
226     double sum = 0;
227     int maxCount = 0;
228     int minCount = Integer.MAX_VALUE;
229     for (String k : countsMap.keySet())
230     {
231       int count = countsMap.get(k);
232       sum += count;
233       if (maxCount < count)
234       {
235         maxCount = count;
236       }
237       if (minCount > count)
238       {
239         minCount = count;
240       }
241       System.out.print(count + " ");
242     }
243     System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
244     System.out.println("\n master:");
245     double sumMaster = 0;
246     int maxCountMaster = 0;
247     int minCountMaster = Integer.MAX_VALUE;
248     for (String k : masterCountsMap.keySet())
249     {
250       int count = masterCountsMap.get(k);
251       sumMaster += count;
252       if (maxCountMaster < count)
253       {
254         maxCountMaster = count;
255       }
256       if (minCountMaster > count)
257       {
258         minCountMaster = count;
259       }
260       System.out.print(count + " ");
261     }
262     System.out.println("\nMean master: "+ 1.0*sumMaster/countsMap.size());
263     System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
264     double mean = sum / (countsMap.size());
265     // calculate the deviation of the node distribution
266     double deviation = 0;
267     for (String k : countsMap.keySet())
268     {
269       double count = countsMap.get(k);
270       deviation += (count - mean) * (count - mean);
271     }
272     System.out.println("Mean: " + mean + " normal deviation:"
273         + Math.sqrt(deviation / countsMap.size()) / mean);
274 
275     //System.out.println("Max count: " + maxCount + " min count:" + minCount);
276     int steps = 10;
277     int stepLen = (maxCount - minCount) / steps;
278     if(stepLen == 0) return;
279     List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount)
280         / stepLen + 1);
281 
282     for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++)
283     {
284       histogram.add(0);
285     }
286     for (String k : countsMap.keySet())
287     {
288       int count = countsMap.get(k);
289       int stepNo = (count - minCount) / stepLen;
290       histogram.set(stepNo, histogram.get(stepNo) + 1);
291     }
292     System.out.println("histogram:");
293     for (Integer x : histogram)
294     {
295       System.out.print(x + " ");
296     }
297   }
298 
299   public static void main(String args[]) throws Exception
300   {
301     int partitions = 4096, replicas = 2;
302     String resourceName = "espressoDB1";
303     List<String> instanceNames = new ArrayList<String>();
304     List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
305     for (int i = 0; i < 20; i++)
306     {
307       instanceNames.add("local"+i+"host_123" + i);
308     }
309     instanceCluster1.add(instanceNames);
310     List<Integer> weights1 = new ArrayList<Integer>();
311     weights1.add(1);
312     ZNRecord result = IdealStateCalculatorByRush.calculateIdealState(
313         instanceCluster1, weights1, partitions, replicas, resourceName);
314 
315     printIdealStateStats(result);
316 
317     List<String> instanceNames2 = new ArrayList<String>();
318     for (int i = 400; i < 405; i++)
319     {
320       instanceNames2.add("localhost_123" + i);
321     }
322     instanceCluster1.add(instanceNames2);
323     weights1.add(1);
324     ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(
325         instanceCluster1, weights1, partitions, replicas, resourceName);
326 
327     printDiff(result, result2);
328     printIdealStateStats(result2);
329   }
330 }