1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.TreeMap;
28
29 import org.apache.helix.ZNRecord;
30 import org.apache.helix.model.IdealState.IdealStateProperty;
31
32
33 public class IdealStateCalculatorByRush
34 {
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 static HashMap<String, Object> buildRushConfig(int numClusters,
51 List<List<String>> instancesPerCluster, int replicationDegree,
52 List<Integer> clusterWeights)
53 {
54 HashMap<String, Object> config = new HashMap<String, Object>();
55 config.put("replicationDegree", replicationDegree);
56
57 HashMap[] clusterList = new HashMap[numClusters];
58 config.put("subClusters", clusterList);
59
60 HashMap[] nodes;
61 HashMap<String, String> node;
62 HashMap<String, Object> clusterData;
63 for (int n = 0; n < numClusters; n++)
64 {
65 int numNodes = instancesPerCluster.get(n).size();
66 nodes = new HashMap[numNodes];
67 for (int i = 0; i < numNodes; i++)
68 {
69 node = new HashMap<String, String>();
70 node.put("partition", instancesPerCluster.get(n).get(i));
71 nodes[i] = node;
72 }
73 clusterData = new HashMap<String, Object>();
74 clusterData.put("weight", clusterWeights.get(n));
75 clusterData.put("nodes", nodes);
76 clusterList[n] = clusterData;
77 }
78 return config;
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 public static ZNRecord calculateIdealState(
99 List<List<String>> instanceClusters,
100 List<Integer> instanceClusterWeights, int partitions, int replicas,
101 String resourceName) throws Exception
102 {
103 ZNRecord result = new ZNRecord(resourceName);
104
105 int numberOfClusters = instanceClusters.size();
106 List<List<String>> nodesInClusters = instanceClusters;
107 List<Integer> clusterWeights = instanceClusterWeights;
108
109 HashMap<String, Object> rushConfig = buildRushConfig(numberOfClusters,
110 nodesInClusters, replicas + 1, clusterWeights);
111 RUSHrHash rushHash = new RUSHrHash(rushConfig);
112
113 Random r = new Random(0);
114 for (int i = 0; i < partitions; i++)
115 {
116 int partitionId = i;
117 String partitionName = resourceName + ".partition-" + partitionId;
118
119 ArrayList<HashMap> partitionAssignmentResult = rushHash
120 .findNode(i);
121 List<String> nodeNames = new ArrayList<String>();
122 for (HashMap<?, ?> p : partitionAssignmentResult)
123 {
124 for (Object key : p.keySet())
125 {
126 if (p.get(key) instanceof String)
127 {
128 nodeNames.add(p.get(key).toString());
129 }
130 }
131 }
132 Map<String, String> partitionAssignment = new TreeMap<String, String>();
133
134 for (int j = 0; j < nodeNames.size(); j++)
135 {
136 partitionAssignment.put(nodeNames.get(j), "SLAVE");
137 }
138 int master = r.nextInt(nodeNames.size());
139
140 partitionAssignment.put(nodeNames.get(master), "MASTER");
141
142
143 result.setMapField(partitionName, partitionAssignment);
144 }
145 result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), String.valueOf(partitions));
146 return result;
147 }
148
149 public static ZNRecord calculateIdealState(
150 List<String> instanceClusters,
151 int instanceClusterWeight, int partitions, int replicas,
152 String resourceName) throws Exception
153 {
154 List<List<String>> instanceClustersList = new ArrayList<List<String>>();
155 instanceClustersList.add(instanceClusters);
156
157 List<Integer> instanceClusterWeightList = new ArrayList<Integer>();
158 instanceClusterWeightList.add(instanceClusterWeight);
159
160 return calculateIdealState(
161 instanceClustersList,
162 instanceClusterWeightList, partitions, replicas,
163 resourceName);
164 }
165
166
167
168
169 public static void printDiff(ZNRecord record1, ZNRecord record2)
170 {
171 int diffCount = 0;
172 int diffCountMaster = 0;
173 for (String key : record1.getMapFields().keySet())
174 {
175 Map<String, String> map1 = record1.getMapField(key);
176 Map<String, String> map2 = record2.getMapField(key);
177
178 for (String k : map1.keySet())
179 {
180 if (!map2.containsKey(k))
181 {
182 diffCount++;
183 }
184 else if (!map1.get(k).equalsIgnoreCase(map2.get(k)))
185 {
186 diffCountMaster++;
187 }
188 }
189 }
190 System.out.println("\ndiff count = " + diffCount);
191 System.out.println("\nmaster diff count:"+ diffCountMaster);
192 }
193
194
195
196
197
198 public static void printIdealStateStats(ZNRecord record)
199 {
200 Map<String, Integer> countsMap = new TreeMap<String, Integer>();
201 Map<String, Integer> masterCountsMap = new TreeMap<String, Integer>();
202 for (String key : record.getMapFields().keySet())
203 {
204 Map<String, String> map1 = record.getMapField(key);
205 for (String k : map1.keySet())
206 {
207 if (!countsMap.containsKey(k))
208 {
209 countsMap.put(k, new Integer(0));
210 }
211 else
212 {
213 countsMap.put(k, countsMap.get(k).intValue() + 1);
214 }
215 if (!masterCountsMap.containsKey(k))
216 {
217 masterCountsMap.put(k, new Integer(0));
218
219 }
220 else if (map1.get(k).equalsIgnoreCase("MASTER"))
221 {
222 masterCountsMap.put(k, masterCountsMap.get(k).intValue() + 1);
223 }
224 }
225 }
226 double sum = 0;
227 int maxCount = 0;
228 int minCount = Integer.MAX_VALUE;
229 for (String k : countsMap.keySet())
230 {
231 int count = countsMap.get(k);
232 sum += count;
233 if (maxCount < count)
234 {
235 maxCount = count;
236 }
237 if (minCount > count)
238 {
239 minCount = count;
240 }
241 System.out.print(count + " ");
242 }
243 System.out.println("\nMax count: " + maxCount + " min count:" + minCount);
244 System.out.println("\n master:");
245 double sumMaster = 0;
246 int maxCountMaster = 0;
247 int minCountMaster = Integer.MAX_VALUE;
248 for (String k : masterCountsMap.keySet())
249 {
250 int count = masterCountsMap.get(k);
251 sumMaster += count;
252 if (maxCountMaster < count)
253 {
254 maxCountMaster = count;
255 }
256 if (minCountMaster > count)
257 {
258 minCountMaster = count;
259 }
260 System.out.print(count + " ");
261 }
262 System.out.println("\nMean master: "+ 1.0*sumMaster/countsMap.size());
263 System.out.println("Max master count: " + maxCountMaster + " min count:" + minCountMaster);
264 double mean = sum / (countsMap.size());
265
266 double deviation = 0;
267 for (String k : countsMap.keySet())
268 {
269 double count = countsMap.get(k);
270 deviation += (count - mean) * (count - mean);
271 }
272 System.out.println("Mean: " + mean + " normal deviation:"
273 + Math.sqrt(deviation / countsMap.size()) / mean);
274
275
276 int steps = 10;
277 int stepLen = (maxCount - minCount) / steps;
278 if(stepLen == 0) return;
279 List<Integer> histogram = new ArrayList<Integer>((maxCount - minCount)
280 / stepLen + 1);
281
282 for (int i = 0; i < (maxCount - minCount) / stepLen + 1; i++)
283 {
284 histogram.add(0);
285 }
286 for (String k : countsMap.keySet())
287 {
288 int count = countsMap.get(k);
289 int stepNo = (count - minCount) / stepLen;
290 histogram.set(stepNo, histogram.get(stepNo) + 1);
291 }
292 System.out.println("histogram:");
293 for (Integer x : histogram)
294 {
295 System.out.print(x + " ");
296 }
297 }
298
299 public static void main(String args[]) throws Exception
300 {
301 int partitions = 4096, replicas = 2;
302 String resourceName = "espressoDB1";
303 List<String> instanceNames = new ArrayList<String>();
304 List<List<String>> instanceCluster1 = new ArrayList<List<String>>();
305 for (int i = 0; i < 20; i++)
306 {
307 instanceNames.add("local"+i+"host_123" + i);
308 }
309 instanceCluster1.add(instanceNames);
310 List<Integer> weights1 = new ArrayList<Integer>();
311 weights1.add(1);
312 ZNRecord result = IdealStateCalculatorByRush.calculateIdealState(
313 instanceCluster1, weights1, partitions, replicas, resourceName);
314
315 printIdealStateStats(result);
316
317 List<String> instanceNames2 = new ArrayList<String>();
318 for (int i = 400; i < 405; i++)
319 {
320 instanceNames2.add("localhost_123" + i);
321 }
322 instanceCluster1.add(instanceNames2);
323 weights1.add(1);
324 ZNRecord result2 = IdealStateCalculatorByRush.calculateIdealState(
325 instanceCluster1, weights1, partitions, replicas, resourceName);
326
327 printDiff(result, result2);
328 printIdealStateStats(result2);
329 }
330 }