1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28
29 import org.apache.helix.HelixException;
30 import org.apache.helix.model.IdealState;
31
32
33 public class IdealStateCalculatorForEspressoRelay
34 {
35 public static IdealState calculateRelayIdealState(List<String> partitions, List<String> instances,
36 String resultRecordName, int replica, String firstValue, String restValue, String stateModelName)
37 {
38 Collections.sort(partitions);
39 Collections.sort(instances);
40 if(instances.size() % replica != 0)
41 {
42 throw new HelixException("Instances must be divided by replica");
43 }
44
45 IdealState result = new IdealState(resultRecordName);
46 result.setNumPartitions(partitions.size());
47 result.setReplicas("" + replica);
48 result.setStateModelDefRef(stateModelName);
49
50 int groups = instances.size() / replica;
51 int remainder = instances.size() % replica;
52
53 int remainder2 = partitions.size() % groups;
54 int storageNodeGroupSize = partitions.size() / groups;
55
56 for(int i = 0; i < groups; i++)
57 {
58 int relayStart = 0, relayEnd = 0, storageNodeStart = 0, storageNodeEnd = 0;
59 if(i < remainder)
60 {
61 relayStart = (replica + 1) * i;
62 relayEnd = (replica + 1) *(i + 1);
63 }
64 else
65 {
66 relayStart = (replica + 1) * remainder + replica * (i - remainder);
67 relayEnd = relayStart + replica;
68 }
69
70 if(i < remainder2)
71 {
72 storageNodeStart = (storageNodeGroupSize + 1) * i;
73 storageNodeEnd = (storageNodeGroupSize + 1) *(i + 1);
74 }
75 else
76 {
77 storageNodeStart = (storageNodeGroupSize + 1) * remainder2 + storageNodeGroupSize * (i - remainder2);
78 storageNodeEnd = storageNodeStart + storageNodeGroupSize;
79 }
80
81
82 List<String> snBatch = partitions.subList(storageNodeStart, storageNodeEnd);
83 List<String> relayBatch = instances.subList(relayStart, relayEnd);
84
85 Map<String, List<String>> sublistFields = calculateSubIdealState(snBatch, relayBatch, replica);
86
87 result.getRecord().getListFields().putAll(sublistFields);
88 }
89
90 for(String snName : result.getRecord().getListFields().keySet())
91 {
92 Map<String, String> mapField = new TreeMap<String, String>();
93 List<String> relayCandidates = result.getRecord().getListField(snName);
94 mapField.put(relayCandidates.get(0), firstValue);
95 for(int i = 1; i < relayCandidates.size(); i++)
96 {
97 mapField.put(relayCandidates.get(i), restValue);
98 }
99 result.getRecord().getMapFields().put(snName, mapField);
100 }
101 System.out.println();
102 return result;
103 }
104
105 private static Map<String, List<String>> calculateSubIdealState(
106 List<String> snBatch, List<String> relayBatch, int replica)
107 {
108 Map<String, List<String>> result = new HashMap<String, List<String>>();
109 for(int i = 0; i < snBatch.size(); i++)
110 {
111 String snName = snBatch.get(i);
112 result.put(snName, new ArrayList<String>());
113 for(int j = 0; j < replica; j++)
114 {
115 result.get(snName).add(relayBatch.get((j + i) % (relayBatch.size())));
116 }
117 }
118 return result;
119 }
120 }