1 package org.apache.helix.spectator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.TreeSet;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.apache.helix.ConfigChangeListener;
33 import org.apache.helix.ExternalViewChangeListener;
34 import org.apache.helix.HelixDataAccessor;
35 import org.apache.helix.NotificationContext;
36 import org.apache.helix.PropertyKey.Builder;
37 import org.apache.helix.model.ExternalView;
38 import org.apache.helix.model.InstanceConfig;
39 import org.apache.log4j.Logger;
40
41
42 public class RoutingTableProvider implements ExternalViewChangeListener, ConfigChangeListener
43 {
44 private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
45 private final AtomicReference<RoutingTable> _routingTableRef;
46
47 public RoutingTableProvider()
48 {
49 _routingTableRef = new AtomicReference<RoutingTableProvider.RoutingTable>(new RoutingTable());
50
51 }
52
53
54
55
56
57
58
59
60
61
62
63 public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state)
64 {
65 List<InstanceConfig> instanceList = null;
66 RoutingTable _routingTable = _routingTableRef.get();
67 ResourceInfo resourceInfo = _routingTable.get(resourceName);
68 if (resourceInfo != null)
69 {
70 PartitionInfo keyInfo = resourceInfo.get(partitionName);
71 if (keyInfo != null)
72 {
73 instanceList = keyInfo.get(state);
74 }
75 }
76 if (instanceList == null)
77 {
78 instanceList = Collections.emptyList();
79 }
80 return instanceList;
81 }
82
83
84
85
86
87
88
89
90 public Set<InstanceConfig> getInstances(String resource, String state)
91 {
92 Set<InstanceConfig> instanceSet = null;
93 RoutingTable routingTable = _routingTableRef.get();
94 ResourceInfo resourceInfo = routingTable.get(resource);
95 if (resourceInfo != null)
96 {
97 instanceSet = resourceInfo.getInstances(state);
98 }
99 if (instanceSet == null)
100 {
101 instanceSet = Collections.emptySet();
102 }
103 return instanceSet;
104 }
105
106 @Override
107 public void onExternalViewChange(List<ExternalView> externalViewList,
108 NotificationContext changeContext)
109 {
110
111 if (changeContext.getType() == NotificationContext.Type.FINALIZE)
112 {
113 logger.info("Resetting the routing table. ");
114 RoutingTable newRoutingTable = new RoutingTable();
115 _routingTableRef.set(newRoutingTable);
116 return;
117 }
118 refresh(externalViewList, changeContext);
119 }
120
121 @Override
122 public void onConfigChange(List<InstanceConfig> configs,
123 NotificationContext changeContext)
124 {
125
126 if (changeContext.getType() == NotificationContext.Type.FINALIZE)
127 {
128 logger.info("Resetting the routing table. ");
129 RoutingTable newRoutingTable = new RoutingTable();
130 _routingTableRef.set(newRoutingTable);
131 return;
132 }
133
134 HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
135 Builder keyBuilder = accessor.keyBuilder();
136 List<ExternalView> externalViewList = accessor.getChildValues(keyBuilder.externalViews());
137 refresh(externalViewList, changeContext);
138 }
139
140 private void refresh(List<ExternalView> externalViewList, NotificationContext changeContext)
141 {
142 HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
143 Builder keyBuilder = accessor.keyBuilder();
144
145 List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
146 Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
147 for (InstanceConfig config : configList)
148 {
149 instanceConfigMap.put(config.getId(), config);
150 }
151 RoutingTable newRoutingTable = new RoutingTable();
152 if (externalViewList != null)
153 {
154 for (ExternalView extView : externalViewList)
155 {
156 String resourceName = extView.getId();
157 for (String partitionName : extView.getPartitionSet())
158 {
159 Map<String, String> stateMap = extView.getStateMap(partitionName);
160 for (String instanceName : stateMap.keySet())
161 {
162 String currentState = stateMap.get(instanceName);
163 if (instanceConfigMap.containsKey(instanceName))
164 {
165 InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
166 newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
167 } else
168 {
169 logger.error("Invalid instance name." + instanceName
170 + " .Not found in /cluster/configs/. instanceName: ");
171 }
172
173 }
174 }
175 }
176 }
177 _routingTableRef.set(newRoutingTable);
178 }
179
180 class RoutingTable
181 {
182 private final HashMap<String, ResourceInfo> resourceInfoMap;
183
184 public RoutingTable()
185 {
186 resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
187 }
188
189 public void addEntry(String resourceName, String partitionName, String state,
190 InstanceConfig config)
191 {
192 if (!resourceInfoMap.containsKey(resourceName))
193 {
194 resourceInfoMap.put(resourceName, new ResourceInfo());
195 }
196 ResourceInfo resourceInfo = resourceInfoMap.get(resourceName);
197 resourceInfo.addEntry(partitionName, state, config);
198
199 }
200
201 ResourceInfo get(String resourceName)
202 {
203 return resourceInfoMap.get(resourceName);
204 }
205
206 }
207
208 class ResourceInfo
209 {
210
211 HashMap<String, PartitionInfo> partitionInfoMap;
212
213 HashMap<String, Set<InstanceConfig>> stateInfoMap;
214
215 public ResourceInfo()
216 {
217 partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>();
218 stateInfoMap = new HashMap<String, Set<InstanceConfig>>();
219 }
220
221 public void addEntry(String stateUnitKey, String state, InstanceConfig config)
222 {
223
224 if (!stateInfoMap.containsKey(state))
225 {
226 Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() {
227
228 @Override
229 public int compare(InstanceConfig o1, InstanceConfig o2)
230 {
231 if (o1 == o2)
232 {
233 return 0;
234 }
235 if (o1 == null)
236 {
237 return -1;
238 }
239 if (o2 == null)
240 {
241 return 1;
242 }
243
244 int compareTo = o1.getHostName().compareTo(o2.getHostName());
245 if (compareTo == 0)
246 {
247 return o1.getPort().compareTo(o2.getPort());
248 } else
249 {
250 return compareTo;
251 }
252
253 }
254 };
255 stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator));
256 }
257 Set<InstanceConfig> set = stateInfoMap.get(state);
258 set.add(config);
259
260 if (!partitionInfoMap.containsKey(stateUnitKey))
261 {
262 partitionInfoMap.put(stateUnitKey, new PartitionInfo());
263 }
264 PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey);
265 stateUnitKeyInfo.addEntry(state, config);
266
267 }
268
269 public Set<InstanceConfig> getInstances(String state)
270 {
271 Set<InstanceConfig> instanceSet = stateInfoMap.get(state);
272 return instanceSet;
273 }
274
275 PartitionInfo get(String stateUnitKey)
276 {
277 return partitionInfoMap.get(stateUnitKey);
278 }
279 }
280
281 class PartitionInfo
282 {
283 HashMap<String, List<InstanceConfig>> stateInfoMap;
284
285 public PartitionInfo()
286 {
287 stateInfoMap = new HashMap<String, List<InstanceConfig>>();
288 }
289
290 public void addEntry(String state, InstanceConfig config)
291 {
292 if (!stateInfoMap.containsKey(state))
293 {
294 stateInfoMap.put(state, new ArrayList<InstanceConfig>());
295 }
296 List<InstanceConfig> list = stateInfoMap.get(state);
297 list.add(config);
298 }
299
300 List<InstanceConfig> get(String state)
301 {
302 return stateInfoMap.get(state);
303 }
304 }
305 }