View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.*;
23  import java.util.zip.CRC32;
24  
25  public class RUSHrHash
26  {
27    /**
28     * @var int holds the value for how many replicas to create for an object
29     */
30    protected int replicationDegree = 1;
31  
32    /**
33     * an array of hash maps where each hash map holds info on the sub cluster
34     * that corresponds to the array indices meaning that array element 0 holds
35     * data for server 0
36     * 
37     * that is the total number of nodes in the cluster this property is populated
38     * at construction time only
39     * 
40     * @var
41     */
42  
43    protected HashMap[] clusters;
44  
45    /**
46     * an array of hash maps where each element holds data for a sub cluster
47     */
48    protected HashMap[] clusterConfig;
49  
50    /**
51     * total number of sub-clusters in our data configuration this property is
52     * populated at construction time only
53     * 
54     * @var integer
55     */
56    protected int totalClusters = 0;
57  
58    /**
59     * the total number of nodes in all of the subClusters this property is
60     * populated at construction time only
61     * 
62     * @var integer
63     */
64    protected int totalNodes = 0;
65  
66    /**
67     * the total number of nodes in all of the clusters this property is populated
68     * at construction time only
69     * 
70     * @var integer
71     */
72    protected int totalNodesW = 0;
73  
74    /**
75     * an array of HashMaps where each HashMap holds the data for a single node
76     */
77    protected HashMap[] nodes = null;
78  
79    /**
80     * @var integer value used to help seed the random number generator
81     */
82    protected final int SEED_PARAM = 1560;
83  
84    /**
85     * random number generator
86     */
87  
88    Random ran = new Random();
89  
90    /**
91     * maximum value we can have from the ran generator
92     */
93    float ranMax = (float) Math.pow(2.0, 16.0);
94  
95    /**
96     * The constructor analyzes the passed config to obtain the fundamental values
97     * and data structures for locating a node. Each of those values is described
98     * in detail above with each property. briefly:
99     * 
100    * this.clusters this.totalClusters this.totalNodes
101    * 
102    * The values above are derived from the HashMap[] oonfig passed to the
103    * locator.
104    * 
105    * @param conf
106    *          dataConfig
107    * 
108    * @throws Exception
109    * 
110    */
111 
112   public RUSHrHash(HashMap<String, Object> conf) throws Exception
113   {
114 
115     clusterConfig = (HashMap[]) conf.get("subClusters");
116     replicationDegree = (Integer) conf.get("replicationDegree");
117 
118     HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
119     totalClusters = subClusters.length;
120     clusters = new HashMap[totalClusters];
121     // check the confg for all of the params
122     // throw a exception if they are not there
123     if (totalClusters <= 0)
124     {
125       throw new Exception(
126           "data config to the RUSHr locator does not contain a valid clusters property");
127     }
128 
129     int nodeCt = 0;
130     HashMap[] nodeData = null;
131     ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
132     HashMap<String, Object> subCluster = null, clusterData = null;
133     Integer clusterDataList[] = null;
134     for (int i = 0; i < totalClusters; i++)
135     {
136       subCluster = subClusters[i];
137       nodeData = (HashMap[]) subCluster.get("nodes");
138 
139       nodeCt = nodeData.length;
140       clusterDataList = new Integer[nodeCt];
141       for (int n = 0; n < nodeCt; n++)
142       {
143         tempNodes.add(nodeData[n]);
144         clusterDataList[n] = n;
145       }
146       totalNodes += nodeCt;
147       totalNodesW += nodeCt * (Integer) subCluster.get("weight");
148 
149       clusterData = new HashMap<String, Object>();
150       clusterData.put("count", nodeCt);
151       clusterData.put("list", clusterDataList);
152       clusters[i] = clusterData;
153     }
154     nodes = new HashMap[totalNodes];
155     tempNodes.toArray(nodes);
156   }
157 
158   /**
159    * This function is an implementation of a RUSHr algorithm as described by R J
160    * Honicky and Ethan Miller
161    * 
162    * @param objKey
163    * @throws Exception
164    * @return
165    */
166   public ArrayList<HashMap> findNode(long objKey) throws Exception
167   {
168 
169     HashMap[] c = this.clusters;
170     int sumRemainingNodes = this.totalNodes;
171     int sumRemainingNodesW = this.totalNodesW;
172     int repDeg = this.replicationDegree;
173     int totClu = this.totalClusters;
174     int totNod = this.totalNodes;
175     HashMap[] clusConfig = this.clusterConfig;
176 
177     // throw an exception if the data is no good
178     if ((totNod <= 0) || (totClu <= 0))
179     {
180       throw new Exception(
181           "the total nodes or total clusters is negative or 0.  bad joo joos!");
182     }
183 
184     // get the starting cluster
185     int currentCluster = totClu - 1;
186 
187     /**
188      * this loop is an implementation of the RUSHr algorithm for fast placement
189      * and location of objects in a distributed storage system
190      * 
191      * j = current cluster m = disks in current cluster n = remaining nodes
192      */
193     ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
194     while (true)
195     {
196 
197       // prevent an infinite loop, in case there is a bug
198       if (currentCluster < 0)
199       {
200         throw new Exception(
201             "the cluster index became negative while we were looking for the following id: objKey.  This should never happen with any key.  There is a bug or maybe your joo joos are BAD!");
202       }
203 
204       HashMap clusterData = clusConfig[currentCluster];
205       Integer weight = (Integer) clusterData.get("weight");
206 
207       Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
208       sumRemainingNodes -= disksInCurrentCluster;
209 
210       Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
211       sumRemainingNodesW -= disksInCurrentClusterW;
212 
213       // set the seed to our set id
214       long seed = objKey + currentCluster;
215       ran.setSeed(seed);
216       int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes)
217           : 0;
218 
219       int u = t
220           + drawWHG(repDeg - t, disksInCurrentClusterW - t,
221               disksInCurrentClusterW + sumRemainingNodesW - t, weight);
222       if (u > 0)
223       {
224         if (u > disksInCurrentCluster)
225         {
226           u = disksInCurrentCluster;
227         }
228         ran.setSeed(objKey + currentCluster + SEED_PARAM);
229         choose(u, currentCluster, sumRemainingNodes, nodeData);
230         reset(u, currentCluster);
231         repDeg -= u;
232       }
233       if (repDeg == 0)
234       {
235         break;
236       }
237       currentCluster--;
238     }
239     return nodeData;
240   }
241 
242   /**
243    * This function is an implementation of a RUSH algorithm as described by R J
244    * Honicky and Ethan Miller
245    * 
246    * @param objKey
247    *          - an int used as the prng seed. this int is usually derived from a
248    *          string hash
249    * 
250    * @return node - holds three values: abs_node - an int which is the absolute
251    *         position of the located node in relation to all nodes on all
252    *         subClusters rel_node - an int which is the relative postion located
253    *         node within the located cluster cluster - an int which is the
254    *         located cluster
255    * @throws Exception
256    * 
257    */
258   public ArrayList<HashMap> findNode(String objKey) throws Exception
259   {
260     // turn a string identifier into an integer for the random seed
261     CRC32 crc32 = new CRC32();
262     byte[] bytes = objKey.getBytes();
263     crc32.update(bytes);
264     long crc32Value = crc32.getValue();
265     long objKeyLong = (crc32Value >> 16) & 0x7fff;
266     return findNode(objKeyLong);
267   }
268 
269   public void reset(int nodesToRetrieve, int currentCluster)
270   {
271     Integer[] list = (Integer[]) clusters[currentCluster].get("list");
272     Integer count = (Integer) clusters[currentCluster].get("count");
273 
274     int listIdx;
275     int val;
276     for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
277     {
278       listIdx = count - nodesToRetrieve + nodeIdx;
279       val = list[listIdx];
280       if (val < (count - nodesToRetrieve))
281       {
282         list[val] = val;
283       }
284       list[listIdx] = listIdx;
285     }
286   }
287 
288   public void choose(int nodesToRetrieve, int currentCluster,
289       int remainingNodes, ArrayList<HashMap> nodeData)
290   {
291     Integer[] list = (Integer[]) clusters[currentCluster].get("list");
292     Integer count = (Integer) clusters[currentCluster].get("count");
293 
294     int maxIdx;
295     int randNode;
296     int chosen;
297     for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
298     {
299       maxIdx = count - nodeIdx - 1;
300       randNode = ran.nextInt(maxIdx + 1);
301       // swap
302       chosen = list[randNode];
303       list[randNode] = list[maxIdx];
304       list[maxIdx] = chosen;
305       // add the remaining nodes so we can find the node data when we are done
306       nodeData.add(nodes[remainingNodes + chosen]);
307     }
308   }
309 
310   /**
311    * @param objKey
312    * @return
313    * @throws com.targetnode.data.locator.Exception
314    */
315   public ArrayList<HashMap> findNodes(String objKey) throws Exception
316   {
317     return findNode(objKey);
318   }
319 
320   public int getReplicationDegree()
321   {
322     return replicationDegree;
323   }
324 
325   public int getTotalNodes()
326   {
327     return totalNodes;
328   }
329 
330   public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks,
331       int weight)
332   {
333     int found = 0;
334     float z;
335     float prob;
336     int ranInt;
337 
338     for (int i = 0; i < replicas; i++)
339     {
340       if (totalDisks != 0)
341       {
342         ranInt = ran.nextInt((int) (ranMax + 1));
343         z = ((float) ranInt / ranMax);
344         prob = ((float) disksInCurrentCluster / (float) totalDisks);
345         if (z <= prob)
346         {
347           found++;
348           disksInCurrentCluster -= weight;
349         }
350         totalDisks -= weight;
351       }
352     }
353     return found;
354   }
355 }