1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33
34 import org.apache.helix.Mocks.MockAccessor;
35 import org.apache.helix.model.ExternalView;
36 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
37 import org.apache.helix.model.InstanceConfig;
38 import org.apache.helix.spectator.RoutingTableProvider;
39 import org.testng.AssertJUnit;
40 import org.testng.annotations.BeforeClass;
41 import org.testng.annotations.Test;
42
43
44 public class TestRoutingTable
45 {
46 NotificationContext changeContext = null;
47
48 @BeforeClass()
49 public synchronized void setup()
50 {
51
52 final String[] array = new String[] { "localhost_8900", "localhost_8901" };
53 HelixManager manager = new Mocks.MockManager() {
54 private MockAccessor _mockAccessor;
55
56 @Override
57
58 public HelixDataAccessor getHelixDataAccessor()
59 {
60 if (_mockAccessor == null)
61 {
62 _mockAccessor = new Mocks.MockAccessor() {
63 @SuppressWarnings("unchecked")
64 @Override
65 public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
66
67 {
68 PropertyType type = key.getType();
69 String[] keys = key.getParams();
70 if (type == PropertyType.CONFIGS && keys != null && keys.length > 1
71 && keys[1].equalsIgnoreCase(ConfigScopeProperty.PARTICIPANT.toString()))
72 {
73 List<InstanceConfig> configs = new ArrayList<InstanceConfig>();
74 for (String instanceName : array)
75 {
76 InstanceConfig config = new InstanceConfig(instanceName);
77 String[] splits = instanceName.split("_");
78 config.setHostName(splits[0]);
79 config.setPort(splits[1]);
80 configs.add(config);
81 }
82 return (List<T>) configs;
83 }
84 return Collections.emptyList();
85 };
86 };
87 }
88 return _mockAccessor;
89 }
90 };
91 changeContext = new NotificationContext(manager);
92 }
93
94 @Test()
95 public void testNullAndEmpty()
96 {
97
98 RoutingTableProvider routingTable = new RoutingTableProvider();
99 routingTable.onExternalViewChange(null, changeContext);
100 List<ExternalView> list = Collections.emptyList();
101 routingTable.onExternalViewChange(list, changeContext);
102
103 }
104
105 @Test()
106 public void testSimple()
107 {
108 List<InstanceConfig> instances;
109 RoutingTableProvider routingTable = new RoutingTableProvider();
110 ZNRecord record = new ZNRecord("TESTDB");
111
112
113 add(record, "TESTDB_0", "localhost_8900", "MASTER");
114 List<ExternalView> externalViewList = new ArrayList<ExternalView>();
115 externalViewList.add(new ExternalView(record));
116 routingTable.onExternalViewChange(externalViewList, changeContext);
117
118 instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
119 AssertJUnit.assertNotNull(instances);
120 AssertJUnit.assertEquals(instances.size(), 1);
121
122
123 add(record, "TESTDB_0", "localhost_8901", "MASTER");
124 add(record, "TESTDB_1", "localhost_8900", "SLAVE");
125
126 externalViewList = new ArrayList<ExternalView>();
127 externalViewList.add(new ExternalView(record));
128 routingTable.onExternalViewChange(externalViewList, changeContext);
129 instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
130 AssertJUnit.assertNotNull(instances);
131 AssertJUnit.assertEquals(instances.size(), 2);
132
133 instances = routingTable.getInstances("TESTDB", "TESTDB_1", "SLAVE");
134 AssertJUnit.assertNotNull(instances);
135 AssertJUnit.assertEquals(instances.size(), 1);
136
137
138 add(record, "TESTDB_0", "localhost_8901", "SLAVE");
139 externalViewList = new ArrayList<ExternalView>();
140 externalViewList.add(new ExternalView(record));
141 routingTable.onExternalViewChange(externalViewList, changeContext);
142 instances = routingTable.getInstances("TESTDB", "TESTDB_0", "SLAVE");
143 AssertJUnit.assertNotNull(instances);
144 AssertJUnit.assertEquals(instances.size(), 1);
145 }
146
147 @Test()
148 public void testStateUnitGroupDeletion()
149 {
150 List<InstanceConfig> instances;
151 RoutingTableProvider routingTable = new RoutingTableProvider();
152
153 List<ExternalView> externalViewList = new ArrayList<ExternalView>();
154 ZNRecord record = new ZNRecord("TESTDB");
155
156
157 add(record, "TESTDB_0", "localhost_8900", "MASTER");
158 externalViewList.add(new ExternalView(record));
159 routingTable.onExternalViewChange(externalViewList, changeContext);
160 instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
161 AssertJUnit.assertNotNull(instances);
162 AssertJUnit.assertEquals(instances.size(), 1);
163
164 externalViewList.clear();
165 routingTable.onExternalViewChange(externalViewList, changeContext);
166 instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
167 AssertJUnit.assertNotNull(instances);
168 AssertJUnit.assertEquals(instances.size(), 0);
169 }
170
171 @Test()
172 public void testGetInstanceForAllStateUnits()
173 {
174 List<InstanceConfig> instancesList;
175 Set<InstanceConfig> instancesSet;
176 InstanceConfig instancesArray[];
177 RoutingTableProvider routingTable = new RoutingTableProvider();
178 List<ExternalView> externalViewList = new ArrayList<ExternalView>();
179 ZNRecord record = new ZNRecord("TESTDB");
180
181
182 add(record, "TESTDB_0", "localhost_8900", "MASTER");
183 add(record, "TESTDB_1", "localhost_8900", "MASTER");
184 add(record, "TESTDB_2", "localhost_8900", "MASTER");
185 add(record, "TESTDB_3", "localhost_8900", "SLAVE");
186 add(record, "TESTDB_4", "localhost_8900", "SLAVE");
187 add(record, "TESTDB_5", "localhost_8900", "SLAVE");
188
189 add(record, "TESTDB_0", "localhost_8901", "SLAVE");
190 add(record, "TESTDB_1", "localhost_8901", "SLAVE");
191 add(record, "TESTDB_2", "localhost_8901", "SLAVE");
192 add(record, "TESTDB_3", "localhost_8901", "MASTER");
193 add(record, "TESTDB_4", "localhost_8901", "MASTER");
194 add(record, "TESTDB_5", "localhost_8901", "MASTER");
195
196 externalViewList.add(new ExternalView(record));
197 routingTable.onExternalViewChange(externalViewList, changeContext);
198 instancesList = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
199 AssertJUnit.assertNotNull(instancesList);
200 AssertJUnit.assertEquals(instancesList.size(), 1);
201 instancesSet = routingTable.getInstances("TESTDB", "MASTER");
202 AssertJUnit.assertNotNull(instancesSet);
203 AssertJUnit.assertEquals(instancesSet.size(), 2);
204 instancesSet = routingTable.getInstances("TESTDB", "SLAVE");
205 AssertJUnit.assertNotNull(instancesSet);
206 AssertJUnit.assertEquals(instancesSet.size(), 2);
207 instancesArray = new InstanceConfig[instancesSet.size()];
208 instancesSet.toArray(instancesArray);
209 AssertJUnit.assertEquals(instancesArray[0].getHostName(), "localhost");
210 AssertJUnit.assertEquals(instancesArray[0].getPort(), "8900");
211 AssertJUnit.assertEquals(instancesArray[1].getHostName(), "localhost");
212 AssertJUnit.assertEquals(instancesArray[1].getPort(), "8901");
213 }
214
215 @Test()
216 public void testMultiThread() throws Exception
217 {
218 final RoutingTableProvider routingTable = new RoutingTableProvider();
219 List<ExternalView> externalViewList = new ArrayList<ExternalView>();
220 ZNRecord record = new ZNRecord("TESTDB");
221 for (int i = 0; i < 1000; i++)
222 {
223 add(record, "TESTDB_" + i, "localhost_8900", "MASTER");
224 }
225 externalViewList.add(new ExternalView(record));
226 routingTable.onExternalViewChange(externalViewList, changeContext);
227 Callable<Boolean> runnable = new Callable<Boolean>() {
228 @Override
229 public Boolean call() throws Exception
230 {
231
232 try
233 {
234 int count = 0;
235 while (count < 100)
236 {
237 List<InstanceConfig> instancesList = routingTable.getInstances("TESTDB", "TESTDB_0",
238 "MASTER");
239 AssertJUnit.assertEquals(instancesList.size(), 1);
240
241
242
243 Thread.sleep(5);
244
245 count++;
246 }
247 } catch (InterruptedException e)
248 {
249
250 }
251 return true;
252 }
253 };
254 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
255 Future<Boolean> submit = executor.submit(runnable);
256 int count = 0;
257 while (count < 10)
258 {
259 try
260 {
261 Thread.sleep(10);
262 } catch (InterruptedException e)
263 {
264 e.printStackTrace();
265 }
266 routingTable.onExternalViewChange(externalViewList, changeContext);
267 count++;
268 }
269
270 Boolean result = submit.get(60, TimeUnit.SECONDS);
271 AssertJUnit.assertEquals(result, Boolean.TRUE);
272
273 }
274
275 private void add(ZNRecord record, String stateUnitKey, String instanceName, String state)
276 {
277 Map<String, String> stateUnitKeyMap = record.getMapField(stateUnitKey);
278 if (stateUnitKeyMap == null)
279 {
280 stateUnitKeyMap = new HashMap<String, String>();
281 record.setMapField(stateUnitKey, stateUnitKeyMap);
282 }
283 stateUnitKeyMap.put(instanceName, state);
284 record.setMapField(stateUnitKey, stateUnitKeyMap);
285 }
286 }