1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.*;
23 import java.util.zip.CRC32;
24
25 public class RUSHrHash
26 {
27
28
29
30 protected int replicationDegree = 1;
31
32
33
34
35
36
37
38
39
40
41
42
43 protected HashMap[] clusters;
44
45
46
47
48 protected HashMap[] clusterConfig;
49
50
51
52
53
54
55
56 protected int totalClusters = 0;
57
58
59
60
61
62
63
64 protected int totalNodes = 0;
65
66
67
68
69
70
71
72 protected int totalNodesW = 0;
73
74
75
76
77 protected HashMap[] nodes = null;
78
79
80
81
82 protected final int SEED_PARAM = 1560;
83
84
85
86
87
88 Random ran = new Random();
89
90
91
92
93 float ranMax = (float) Math.pow(2.0, 16.0);
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public RUSHrHash(HashMap<String, Object> conf) throws Exception
113 {
114
115 clusterConfig = (HashMap[]) conf.get("subClusters");
116 replicationDegree = (Integer) conf.get("replicationDegree");
117
118 HashMap[] subClusters = (HashMap[]) conf.get("subClusters");
119 totalClusters = subClusters.length;
120 clusters = new HashMap[totalClusters];
121
122
123 if (totalClusters <= 0)
124 {
125 throw new Exception(
126 "data config to the RUSHr locator does not contain a valid clusters property");
127 }
128
129 int nodeCt = 0;
130 HashMap[] nodeData = null;
131 ArrayList<HashMap> tempNodes = new ArrayList<HashMap>();
132 HashMap<String, Object> subCluster = null, clusterData = null;
133 Integer clusterDataList[] = null;
134 for (int i = 0; i < totalClusters; i++)
135 {
136 subCluster = subClusters[i];
137 nodeData = (HashMap[]) subCluster.get("nodes");
138
139 nodeCt = nodeData.length;
140 clusterDataList = new Integer[nodeCt];
141 for (int n = 0; n < nodeCt; n++)
142 {
143 tempNodes.add(nodeData[n]);
144 clusterDataList[n] = n;
145 }
146 totalNodes += nodeCt;
147 totalNodesW += nodeCt * (Integer) subCluster.get("weight");
148
149 clusterData = new HashMap<String, Object>();
150 clusterData.put("count", nodeCt);
151 clusterData.put("list", clusterDataList);
152 clusters[i] = clusterData;
153 }
154 nodes = new HashMap[totalNodes];
155 tempNodes.toArray(nodes);
156 }
157
158
159
160
161
162
163
164
165
166 public ArrayList<HashMap> findNode(long objKey) throws Exception
167 {
168
169 HashMap[] c = this.clusters;
170 int sumRemainingNodes = this.totalNodes;
171 int sumRemainingNodesW = this.totalNodesW;
172 int repDeg = this.replicationDegree;
173 int totClu = this.totalClusters;
174 int totNod = this.totalNodes;
175 HashMap[] clusConfig = this.clusterConfig;
176
177
178 if ((totNod <= 0) || (totClu <= 0))
179 {
180 throw new Exception(
181 "the total nodes or total clusters is negative or 0. bad joo joos!");
182 }
183
184
185 int currentCluster = totClu - 1;
186
187
188
189
190
191
192
193 ArrayList<HashMap> nodeData = new ArrayList<HashMap>();
194 while (true)
195 {
196
197
198 if (currentCluster < 0)
199 {
200 throw new Exception(
201 "the cluster index became negative while we were looking for the following id: objKey. This should never happen with any key. There is a bug or maybe your joo joos are BAD!");
202 }
203
204 HashMap clusterData = clusConfig[currentCluster];
205 Integer weight = (Integer) clusterData.get("weight");
206
207 Integer disksInCurrentCluster = (Integer) c[currentCluster].get("count");
208 sumRemainingNodes -= disksInCurrentCluster;
209
210 Integer disksInCurrentClusterW = disksInCurrentCluster * weight;
211 sumRemainingNodesW -= disksInCurrentClusterW;
212
213
214 long seed = objKey + currentCluster;
215 ran.setSeed(seed);
216 int t = (repDeg - sumRemainingNodes) > 0 ? (repDeg - sumRemainingNodes)
217 : 0;
218
219 int u = t
220 + drawWHG(repDeg - t, disksInCurrentClusterW - t,
221 disksInCurrentClusterW + sumRemainingNodesW - t, weight);
222 if (u > 0)
223 {
224 if (u > disksInCurrentCluster)
225 {
226 u = disksInCurrentCluster;
227 }
228 ran.setSeed(objKey + currentCluster + SEED_PARAM);
229 choose(u, currentCluster, sumRemainingNodes, nodeData);
230 reset(u, currentCluster);
231 repDeg -= u;
232 }
233 if (repDeg == 0)
234 {
235 break;
236 }
237 currentCluster--;
238 }
239 return nodeData;
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 public ArrayList<HashMap> findNode(String objKey) throws Exception
259 {
260
261 CRC32 crc32 = new CRC32();
262 byte[] bytes = objKey.getBytes();
263 crc32.update(bytes);
264 long crc32Value = crc32.getValue();
265 long objKeyLong = (crc32Value >> 16) & 0x7fff;
266 return findNode(objKeyLong);
267 }
268
269 public void reset(int nodesToRetrieve, int currentCluster)
270 {
271 Integer[] list = (Integer[]) clusters[currentCluster].get("list");
272 Integer count = (Integer) clusters[currentCluster].get("count");
273
274 int listIdx;
275 int val;
276 for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
277 {
278 listIdx = count - nodesToRetrieve + nodeIdx;
279 val = list[listIdx];
280 if (val < (count - nodesToRetrieve))
281 {
282 list[val] = val;
283 }
284 list[listIdx] = listIdx;
285 }
286 }
287
288 public void choose(int nodesToRetrieve, int currentCluster,
289 int remainingNodes, ArrayList<HashMap> nodeData)
290 {
291 Integer[] list = (Integer[]) clusters[currentCluster].get("list");
292 Integer count = (Integer) clusters[currentCluster].get("count");
293
294 int maxIdx;
295 int randNode;
296 int chosen;
297 for (int nodeIdx = 0; nodeIdx < nodesToRetrieve; nodeIdx++)
298 {
299 maxIdx = count - nodeIdx - 1;
300 randNode = ran.nextInt(maxIdx + 1);
301
302 chosen = list[randNode];
303 list[randNode] = list[maxIdx];
304 list[maxIdx] = chosen;
305
306 nodeData.add(nodes[remainingNodes + chosen]);
307 }
308 }
309
310
311
312
313
314
315 public ArrayList<HashMap> findNodes(String objKey) throws Exception
316 {
317 return findNode(objKey);
318 }
319
320 public int getReplicationDegree()
321 {
322 return replicationDegree;
323 }
324
325 public int getTotalNodes()
326 {
327 return totalNodes;
328 }
329
330 public int drawWHG(int replicas, int disksInCurrentCluster, int totalDisks,
331 int weight)
332 {
333 int found = 0;
334 float z;
335 float prob;
336 int ranInt;
337
338 for (int i = 0; i < replicas; i++)
339 {
340 if (totalDisks != 0)
341 {
342 ranInt = ran.nextInt((int) (ranMax + 1));
343 z = ((float) ranInt / ranMax);
344 prob = ((float) disksInCurrentCluster / (float) totalDisks);
345 if (z <= prob)
346 {
347 found++;
348 disksInCurrentCluster -= weight;
349 }
350 totalDisks -= weight;
351 }
352 }
353 return found;
354 }
355 }