View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Random;
28  import java.util.Set;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  
32  import org.apache.helix.ZNRecord;
33  import org.apache.helix.model.IdealState.IdealStateProperty;
34  
35  
36  public class IdealCalculatorByConsistentHashing
37  {
38    /**
39     * Interface to calculate the hash function value of a string
40     */
41    public interface HashFunction
42    {
43      public int getHashValue(String key);
44    }
45  
46    /**
47     * The default string hash function. Same as the default function used by
48     * Voldmort
49     */
50    public static class FnvHash implements HashFunction
51    {
52      private static final long FNV_BASIS = 0x811c9dc5;
53      private static final long FNV_PRIME = (1 << 24) + 0x193;
54      public static final long FNV_BASIS_64 = 0xCBF29CE484222325L;
55      public static final long FNV_PRIME_64 = 1099511628211L;
56  
57      public int hash(byte[] key)
58      {
59        long hash = FNV_BASIS;
60        for (int i = 0; i < key.length; i++)
61        {
62          hash ^= 0xFF & key[i];
63          hash *= FNV_PRIME;
64        }
65        return (int) hash;
66      }
67  
68      public long hash64(long val)
69      {
70        long hashval = FNV_BASIS_64;
71        for (int i = 0; i < 8; i++)
72        {
73          long octet = val & 0x00ff;
74          val = val >> 8;
75          hashval = hashval ^ octet;
76          hashval = hashval * FNV_PRIME_64;
77        }
78        return Math.abs(hashval);
79      }
80  
81      @Override
82      public int getHashValue(String key)
83      {
84        return hash(key.getBytes());
85      }
86  
87    }
88  
89    /**
90     * Calculate the ideal state for list of instances clusters using consistent
91     * hashing.
92     *
93     * @param instanceNames
94     *          List of instance names.
95     * @param partitions
96     *          the partition number of the database
97     * @param replicas
98     *          the replication degree
99     * @param resourceName
100    *          the name of the database
101    * @return The ZNRecord that contains the ideal state
102    */
103   public static ZNRecord calculateIdealState(List<String> instanceNames,
104       int partitions, int replicas, String resourceName, HashFunction hashFunc)
105   {
106     return calculateIdealState(instanceNames, partitions, replicas, resourceName,
107         hashFunc, 65536);
108   }
109 
110   /**
111    * Calculate the ideal state for list of instances clusters using consistent
112    * hashing.
113    *
114    * @param instanceNames
115    *          List of instance names.
116    * @param partitions
117    *          the partition number of the database
118    * @param replicas
119    *          the replication degree
120    * @param resourceName
121    *          the name of the database
122    * @param hashRingSize
123    *          the size of the hash ring used by consistent hashing
124    * @return The ZNRecord that contains the ideal state
125    */
126   public static ZNRecord calculateIdealState(List<String> instanceNames,
127       int partitions, int replicas, String resourceName, HashFunction hashFunc,
128       int hashRingSize)
129   {
130     ZNRecord result = new ZNRecord(resourceName);
131 
132     int[] hashRing = generateEvenHashRing(instanceNames, hashRingSize);
133     result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
134     Random rand = new Random(0xc0ffee);
135     for (int i = 0; i < partitions; i++)
136     {
137       String partitionName = resourceName + ".partition-" + i;
138       int hashPos = rand.nextInt() % hashRingSize;
139       // (int)(hashFunc.getHashValue(partitionName) % hashRingSize);
140       hashPos = hashPos < 0 ? (hashPos + hashRingSize) : hashPos;
141       // System.out.print(hashPos+ " ");
142       // if(i % 120 == 0) System.out.println();
143       Map<String, String> partitionAssignment = new TreeMap<String, String>();
144       // the first in the list is the node that contains the master
145       int masterPos = hashRing[hashPos];
146       partitionAssignment.put(instanceNames.get(masterPos), "MASTER");
147 
148       // partitionAssignment.put("hash", "" + hashPos + " " + masterPos);
149 
150       // Put slaves in next has ring positions. We need to make sure that no
151       // more than 2 slaves
152       // are mapped to one node.
153       for (int j = 1; j <= replicas; j++)
154       {
155         String next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
156         while (partitionAssignment.containsKey(next))
157         {
158           hashPos++;
159           next = instanceNames.get(hashRing[(hashPos + j) % hashRingSize]);
160         }
161         partitionAssignment.put(next, "SLAVE");
162       }
163       result.setMapField(partitionName, partitionAssignment);
164     }
165     return result;
166   }
167 
168   /**
169    * Generate the has ring for consistent hashing.
170    *
171    * @param instanceNames
172    *          List of instance names.
173    * @param hashRingSize
174    *          the size of the hash ring used by consistent hashing
175    * @return The int array as the hashing. it contains random values ranges from
176    *         0..size of instanceNames-1
177    */
178   public static int[] generateHashRing(List<String> instanceNames,
179       int hashRingSize)
180   {
181     int[] result = new int[hashRingSize];
182     for (int i = 0; i < result.length; i++)
183     {
184       result[i] = 0;
185     }
186     int instances = instanceNames.size();
187     // The following code generates the random distribution
188     for (int i = 1; i < instances; i++)
189     {
190       putNodeOnHashring(result, i, hashRingSize / (i + 1), i);
191     }
192     return result;
193   }
194 
195   public static int[] generateEvenHashRing(List<String> instanceNames,
196       int hashRingSize)
197   {
198     int[] result = new int[hashRingSize];
199     for (int i = 0; i < result.length; i++)
200     {
201       result[i] = 0;
202     }
203     int instances = instanceNames.size();
204     // The following code generates the random distribution
205     for (int i = 1; i < instances; i++)
206     {
207       putNodeEvenOnHashRing(result, i, i + 1);
208     }
209     return result;
210   }
211 
212   private static void putNodeEvenOnHashRing(int[] hashRing, int nodeVal,
213       int totalValues)
214   {
215     int newValNum = hashRing.length / totalValues;
216     assert (newValNum > 0);
217     Map<Integer, List<Integer>> valueIndex = buildValueIndex(hashRing);
218     int nSources = valueIndex.size();
219     int remainder = newValNum % nSources;
220 
221     List<List<Integer>> positionLists = new ArrayList<List<Integer>>();
222     for (List<Integer> list : valueIndex.values())
223     {
224       positionLists.add(list);
225     }
226     class ListComparator implements Comparator<List<Integer>>
227     {
228       @Override
229       public int compare(List<Integer> o1, List<Integer> o2)
230       {
231         return (o1.size() > o2.size() ? -1 : (o1.size() == o2.size() ? 0 : 1));
232       }
233     }
234     Collections.sort(positionLists, new ListComparator());
235 
236     for (List<Integer> oldValPositions : positionLists)
237     {
238       // List<Integer> oldValPositions = valueIndex.get(oldVal);
239       int nValsToReplace = newValNum / nSources;
240       assert (nValsToReplace > 0);
241       if (remainder > 0)
242       {
243         nValsToReplace++;
244         remainder--;
245       }
246       // System.out.print(oldValPositions.size()+" "+nValsToReplace+"  ");
247       putNodeValueOnHashRing(hashRing, nodeVal, nValsToReplace, oldValPositions);
248       // randomly take nValsToReplace positions in oldValPositions and make them
249     }
250     // System.out.println();
251   }
252 
253   private static void putNodeValueOnHashRing(int[] hashRing, int nodeVal,
254       int numberOfValues, List<Integer> positions)
255   {
256     Random rand = new Random(nodeVal);
257     // initialize the index array
258     int[] index = new int[positions.size()];
259     for (int i = 0; i < index.length; i++)
260     {
261       index[i] = i;
262     }
263 
264     int nodesLeft = index.length;
265 
266     for (int i = 0; i < numberOfValues; i++)
267     {
268       // Calculate a random index
269       int randIndex = rand.nextInt() % nodesLeft;
270       if (randIndex < 0)
271       {
272         randIndex += nodesLeft;
273       }
274       hashRing[positions.get(index[randIndex])] = nodeVal;
275 
276       // swap the random index and the last available index, and decrease the
277       // nodes left
278       int temp = index[randIndex];
279       index[randIndex] = index[nodesLeft - 1];
280       index[nodesLeft - 1] = temp;
281       nodesLeft--;
282     }
283   }
284 
285   private static Map<Integer, List<Integer>> buildValueIndex(int[] hashRing)
286   {
287     Map<Integer, List<Integer>> result = new TreeMap<Integer, List<Integer>>();
288     for (int i = 0; i < hashRing.length; i++)
289     {
290       if (!result.containsKey(hashRing[i]))
291       {
292         List<Integer> list = new ArrayList<Integer>();
293         result.put(hashRing[i], list);
294       }
295       result.get(hashRing[i]).add(i);
296     }
297     return result;
298   }
299 
300   /**
301    * Uniformly put node values on the hash ring. Derived from the shuffling
302    * algorithm
303    *
304    * @param result
305    *          the hash ring array.
306    * @param nodeValue
307    *          the int value to be added to the hash ring this time
308    * @param numberOfNodes
309    *          number of node values to put on the hash ring array
310    * @param randomSeed
311    *          the random seed
312    */
313   public static void putNodeOnHashring(int[] result, int nodeValue,
314       int numberOfNodes, int randomSeed)
315   {
316     Random rand = new Random(randomSeed);
317     // initialize the index array
318     int[] index = new int[result.length];
319     for (int i = 0; i < index.length; i++)
320     {
321       index[i] = i;
322     }
323 
324     int nodesLeft = index.length;
325 
326     for (int i = 0; i < numberOfNodes; i++)
327     {
328       // Calculate a random index
329       int randIndex = rand.nextInt() % nodesLeft;
330       if (randIndex < 0)
331       {
332         randIndex += nodesLeft;
333       }
334       if (result[index[randIndex]] == nodeValue)
335       {
336         assert (false);
337       }
338       result[index[randIndex]] = nodeValue;
339 
340       // swap the random index and the last available index, and decrease the
341       // nodes left
342       int temp = index[randIndex];
343       index[randIndex] = index[nodesLeft - 1];
344       index[nodesLeft - 1] = temp;
345 
346       nodesLeft--;
347     }
348   }
349 
350   /**
351    * Helper function to see how many partitions are mapped to different
352    * instances in two ideal states
353    * */
354   public static void printDiff(ZNRecord record1, ZNRecord record2)
355   {
356     int diffCount = 0;
357     for (String key : record1.getMapFields().keySet())
358     {
359       Map<String, String> map1 = record1.getMapField(key);
360       Map<String, String> map2 = record2.getMapField(key);
361 
362       for (String k : map1.keySet())
363       {
364         if (!map2.containsKey(k))
365         {
366           diffCount++;
367         } else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
368         {
369           diffCount++;
370         }
371       }
372     }
373     System.out.println("diff count = " + diffCount);
374   }
375 
376   /**
377    * Helper function to compare the difference between two hashing buffers
378    * */
379   public static void compareHashrings(int[] ring1, int[] ring2)
380   {
381     int diff = 0;
382     for (int i = 0; i < ring1.length; i++)
383     {
384       if (ring1[i] != ring2[i])
385       {
386         diff++;
387       }
388     }
389     System.out.println("ring diff: " + diff);
390   }
391 
392   public static void printNodeOfflineOverhead(ZNRecord record)
393   {
394     // build node -> partition map
395     Map<String, Set<String>> nodeNextMap = new TreeMap<String, Set<String>>();
396     for (String partitionName : record.getMapFields().keySet())
397     {
398       Map<String, String> map1 = record.getMapField(partitionName);
399       String master = "", slave = "";
400       for (String nodeName : map1.keySet())
401       {
402         if (!nodeNextMap.containsKey(nodeName))
403         {
404           nodeNextMap.put(nodeName, new TreeSet<String>());
405         }
406 
407         // String master = "", slave = "";
408         if (map1.get(nodeName).equalsIgnoreCase("MASTER"))
409         {
410           master = nodeName;
411         } else
412         {
413           if (slave.equalsIgnoreCase(""))
414           {
415             slave = nodeName;
416           }
417         }
418 
419       }
420       nodeNextMap.get(master).add(slave);
421     }
422     System.out.println("next count: ");
423     for (String key : nodeNextMap.keySet())
424     {
425       System.out.println(nodeNextMap.get(key).size() + " ");
426     }
427     System.out.println();
428   }
429 
430   /**
431    * Helper function to calculate and print the standard deviation of the
432    * partition assignment ideal state, also the min/max of master partitions
433    * that is hosted on each node
434    * */
435   public static void printIdealStateStats(ZNRecord record, String value)
436   {
437     Map<String, Integer> countsMap = new TreeMap<String, Integer>();
438     for (String key : record.getMapFields().keySet())
439     {
440       Map<String, String> map1 = record.getMapField(key);
441       for (String k : map1.keySet())
442       {
443         if (!countsMap.containsKey(k))
444         {
445           countsMap.put(k, new Integer(0));//
446         }
447         if (value.equals("") || map1.get(k).equalsIgnoreCase(value))
448         {
449           countsMap.put(k, countsMap.get(k).intValue() + 1);
450         }
451       }
452     }
453     double sum = 0;
454     int maxCount = 0;
455     int minCount = Integer.MAX_VALUE;
456 
457     System.out.println("Partition distributions: ");
458     for (String k : countsMap.keySet())
459     {
460       int count = countsMap.get(k);
461       sum += count;
462       if (maxCount < count)
463       {
464         maxCount = count;
465       }
466       if (minCount > count)
467       {
468         minCount = count;
469       }
470       System.out.print(count + " ");
471     }
472     System.out.println();
473     double mean = sum / (countsMap.size());
474     // calculate the deviation of the node distribution
475     double deviation = 0;
476     for (String k : countsMap.keySet())
477     {
478       double count = countsMap.get(k);
479       deviation += (count - mean) * (count - mean);
480     }
481     System.out.println("Mean: " + mean + " normal deviation:"
482         + Math.sqrt(deviation / countsMap.size()));
483 
484     System.out.println("Max count: " + maxCount + " min count:" + minCount);
485     /*
486      * int steps = 10; int stepLen = (maxCount - minCount)/steps; List<Integer>
487      * histogram = new ArrayList<Integer>((maxCount - minCount)/stepLen + 1);
488      *
489      * for(int i = 0; i< (maxCount - minCount)/stepLen + 1; i++) {
490      * histogram.add(0); } for(String k :countsMap.keySet()) { int count =
491      * countsMap.get(k); int stepNo = (count - minCount)/stepLen;
492      * histogram.set(stepNo, histogram.get(stepNo) +1); }
493      * System.out.println("histogram:"); for(Integer x : histogram) {
494      * System.out.print(x+" "); }
495      */
496   }
497 
498   public static void printHashRingStat(int[] hashRing)
499   {
500     double sum = 0, mean = 0, deviation = 0;
501     Map<Integer, Integer> countsMap = new TreeMap<Integer, Integer>();
502     for (int i = 0; i < hashRing.length; i++)
503     {
504       if (!countsMap.containsKey(hashRing[i]))
505       {
506         countsMap.put(hashRing[i], new Integer(0));//
507       }
508       countsMap.put(hashRing[i], countsMap.get(hashRing[i]).intValue() + 1);
509     }
510     int maxCount = Integer.MIN_VALUE;
511     int minCount = Integer.MAX_VALUE;
512     for (Integer k : countsMap.keySet())
513     {
514       int count = countsMap.get(k);
515       sum += count;
516       if (maxCount < count)
517       {
518         maxCount = count;
519       }
520       if (minCount > count)
521       {
522         minCount = count;
523       }
524     }
525     mean = sum / countsMap.size();
526     for (Integer k : countsMap.keySet())
527     {
528       int count = countsMap.get(k);
529       deviation += (count - mean) * (count - mean);
530     }
531     System.out.println("hashring Mean: " + mean + " normal deviation:"
532         + Math.sqrt(deviation / countsMap.size()));
533 
534   }
535 
536   static int[] getFnvHashArray(List<String> strings)
537   {
538     int[] result = new int[strings.size()];
539     int i = 0;
540     IdealCalculatorByConsistentHashing.FnvHash hashfunc = new IdealCalculatorByConsistentHashing.FnvHash();
541     for (String s : strings)
542     {
543       int val = hashfunc.getHashValue(s) % 65536;
544       if (val < 0)
545         val += 65536;
546       result[i++] = val;
547     }
548     return result;
549   }
550 
551   static void printArrayStat(int[] vals)
552   {
553     double sum = 0, mean = 0, deviation = 0;
554 
555     for (int i = 0; i < vals.length; i++)
556     {
557       sum += vals[i];
558     }
559     mean = sum / vals.length;
560     for (int i = 0; i < vals.length; i++)
561     {
562       deviation += (mean - vals[i]) * (mean - vals[i]);
563     }
564     System.out.println("normalized deviation: "
565         + Math.sqrt(deviation / vals.length) / mean);
566   }
567 
568   public static void main(String args[]) throws Exception
569   {
570     // Test the hash ring generation
571     List<String> instanceNames = new ArrayList<String>();
572     for (int i = 0; i < 10; i++)
573     {
574       instanceNames.add("localhost_123" + i);
575     }
576 
577     // int[] ring1 =
578     // IdealCalculatorByConsistentHashing.generateEvenHashRing(instanceNames,
579     // 65535);
580     // printHashRingStat(ring1);
581     // int[] ring1 = getFnvHashArray(instanceNames);
582     // printArrayStat(ring1);
583 
584     int partitions = 200, replicas = 2;
585     String dbName = "espressoDB1";
586 
587     ZNRecord result = IdealCalculatorByConsistentHashing.calculateIdealState(
588         instanceNames, partitions, replicas, dbName,
589         new IdealCalculatorByConsistentHashing.FnvHash());
590     System.out.println("\nMaster :");
591     printIdealStateStats(result, "MASTER");
592 
593     System.out.println("\nSlave :");
594     printIdealStateStats(result, "SLAVE");
595 
596     System.out.println("\nTotal :");
597     printIdealStateStats(result, "");
598 
599     printNodeOfflineOverhead(result);
600     /*
601      * ZNRecordSerializer serializer = new ZNRecordSerializer(); byte[] bytes;
602      * bytes = serializer.serialize(result); // System.out.println(new
603      * String(bytes));
604      *
605      * List<String> instanceNames2 = new ArrayList<String>(); for(int i = 0;i <
606      * 40; i++) { instanceNames2.add("localhost_123"+i); }
607      *
608      * ZNRecord result2 =
609      * IdealCalculatorByConsistentHashing.calculateIdealState( instanceNames2,
610      * partitions, replicas, dbName, new
611      * IdealCalculatorByConsistentHashing.FnvHash());
612      *
613      * printDiff(result, result2);
614      *
615      * //IdealCalculatorByConsistentHashing.printIdealStateStats(result2);
616      *
617      *
618      *
619      * int[] ring2 =
620      * IdealCalculatorByConsistentHashing.generateHashRing(instanceNames2,
621      * 30000);
622      *
623      * IdealCalculatorByConsistentHashing.compareHashrings(ring1, ring2);
624      * //printNodeStats(result); //printNodeStats(result2); bytes =
625      * serializer.serialize(result2); printHashRingStat(ring2); //
626      * System.out.println(new String(bytes));
627      */
628 
629   }
630 }