1 package org.apache.helix.store.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29
30 import org.I0Itec.zkclient.exception.ZkNoNodeException;
31 import org.apache.helix.AccessOption;
32 import org.apache.helix.ZNRecord;
33 import org.apache.helix.ZkUnitTestBase;
34 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
35 import org.apache.helix.manager.zk.ZkClient;
36 import org.apache.helix.store.HelixPropertyListener;
37 import org.apache.helix.store.zk.ZkHelixPropertyStore;
38 import org.testng.Assert;
39 import org.testng.annotations.Test;
40
41
42 public class TestZkHelixPropertyStore extends ZkUnitTestBase
43 {
44 final String _root = "/" + getShortClassName();
45 final int bufSize = 128;
46 final int mapNr = 10;
47 final int firstLevelNr = 10;
48 final int secondLevelNr = 10;
49
50
51
52 class TestListener implements HelixPropertyListener
53 {
54 Map<String, Long> _changeKeys = new HashMap<String, Long>();
55 Map<String, Long> _createKeys = new HashMap<String, Long>();
56 Map<String, Long> _deleteKeys = new HashMap<String, Long>();
57
58 public void reset()
59 {
60 _changeKeys.clear();
61 _createKeys.clear();
62 _deleteKeys.clear();
63 }
64
65 @Override
66 public void onDataChange(String path)
67 {
68 _changeKeys.put(path, new Long(System.currentTimeMillis()));
69 }
70
71 @Override
72 public void onDataCreate(String path)
73 {
74 _createKeys.put(path, new Long(System.currentTimeMillis()));
75 }
76
77 @Override
78 public void onDataDelete(String path)
79 {
80 _deleteKeys.put(path, new Long(System.currentTimeMillis()));
81 }
82 }
83
84 @Test
85 public void testSet()
86 {
87
88
89 System.out.println("START testSet() at " + new Date(System.currentTimeMillis()));
90
91 String subRoot = _root + "/" + "set";
92 List<String> subscribedPaths = new ArrayList<String>();
93 subscribedPaths.add(subRoot);
94 ZkHelixPropertyStore<ZNRecord> store =
95 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
96 subRoot,
97 subscribedPaths);
98
99
100 setNodes(store, 'a', false);
101 for (int i = 0; i < 10; i++)
102 {
103 for (int j = 0; j < 10; j++)
104 {
105 String nodeId = getNodeId(i, j);
106 String key = getSecondLevelKey(i, j);
107 ZNRecord record = store.get(key, null, 0);
108 Assert.assertEquals(record.getId(), nodeId);
109 }
110 }
111
112
113 long startT = System.currentTimeMillis();
114 for (int i = 0; i < 1000; i++)
115 {
116 ZNRecord record = store.get("/node_0/childNode_0_0", null, 0);
117 Assert.assertNotNull(record);
118 }
119 long endT = System.currentTimeMillis();
120 System.out.println("1000 Get() time used: " + (endT - startT) + "ms");
121 long latency = endT - startT;
122 Assert.assertTrue(latency < 200, "1000 Gets should be finished within 200ms, but was " + latency + " ms");
123
124 store.stop();
125 System.out.println("END testSet() at " + new Date(System.currentTimeMillis()));
126 }
127
128 @Test
129 public void testSetInvalidPath()
130 {
131 String subRoot = _root + "/" + "setInvalidPath";
132 ZkHelixPropertyStore<ZNRecord> store =
133 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
134 subRoot,
135 null);
136 try {
137 store.set("abc/xyz", new ZNRecord("testInvalid"), AccessOption.PERSISTENT);
138 Assert.fail("Should throw illegal-argument-exception since path doesn't start with /");
139 } catch (IllegalArgumentException e) {
140
141
142 } catch (Exception e) {
143 Assert.fail("Should not throw exceptions other than illegal-argument");
144 }
145 }
146
147 @Test
148 public void testLocalTriggeredCallback() throws Exception
149 {
150 System.out.println("START testLocalTriggeredCallback() at "
151 + new Date(System.currentTimeMillis()));
152
153 String subRoot = _root + "/" + "localCallback";
154 List<String> subscribedPaths = new ArrayList<String>();
155 subscribedPaths.add(subRoot);
156 ZkHelixPropertyStore<ZNRecord> store =
157 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
158 subRoot,
159 subscribedPaths);
160
161
162
163 TestListener listener = new TestListener();
164 store.subscribe("/", listener);
165
166
167 listener.reset();
168 setNodes(store, 'a', true);
169
170
171 Thread.sleep(500);
172 int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
173 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
174 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
175 Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
176 + expectCreateNodes + " create callbacks");
177
178
179 listener.reset();
180 setNodes(store, 'b', true);
181
182
183 Thread.sleep(500);
184 int expectChangeNodes = firstLevelNr * secondLevelNr;
185 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
186 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
187 Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
188 "Should receive at least " + expectChangeNodes
189 + " change callbacks");
190
191
192 listener.reset();
193 int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
194 store.remove("/", 0);
195
196 for (int i = 0; i < 10; i++) {
197 if (listener._deleteKeys.size() == expectDeleteNodes)
198 break;
199 Thread.sleep(500);
200 }
201
202 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
203 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
204 Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
205 + expectDeleteNodes + " delete callbacks");
206
207 store.stop();
208 System.out.println("END testLocalTriggeredCallback() at "
209 + new Date(System.currentTimeMillis()));
210 }
211
212 @Test
213 public void testZkTriggeredCallback() throws Exception
214 {
215 System.out.println("START testZkTriggeredCallback() at "
216 + new Date(System.currentTimeMillis()));
217
218 String subRoot = _root + "/" + "zkCallback";
219 List<String> subscribedPaths = Arrays.asList(subRoot);
220 ZkHelixPropertyStore<ZNRecord> store =
221 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
222 subRoot,
223 subscribedPaths);
224
225
226
227 TestListener listener = new TestListener();
228 store.subscribe("/", listener);
229
230
231 listener.reset();
232 setNodes(_gZkClient, subRoot, 'a', true);
233 int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
234 Thread.sleep(500);
235
236 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
237 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
238 Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
239 + expectCreateNodes + " create callbacks");
240
241
242 listener.reset();
243 setNodes(_gZkClient, subRoot, 'b', true);
244 int expectChangeNodes = firstLevelNr * secondLevelNr;
245 for (int i = 0; i < 10; i++) {
246 if (listener._changeKeys.size() >= expectChangeNodes)
247 break;
248 Thread.sleep(500);
249 }
250
251 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
252 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
253 Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
254 "Should receive at least " + expectChangeNodes
255 + " change callbacks");
256
257
258 listener.reset();
259 int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
260 _gZkClient.deleteRecursive(subRoot);
261 Thread.sleep(1000);
262
263 System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
264 + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
265 Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
266 + expectDeleteNodes + " delete callbacks");
267
268 store.stop();
269 System.out.println("END testZkTriggeredCallback() at "
270 + new Date(System.currentTimeMillis()));
271 }
272
273 @Test
274 public void testBackToBackRemoveAndSet() throws Exception
275 {
276 System.out.println("START testBackToBackRemoveAndSet() at "
277 + new Date(System.currentTimeMillis()));
278
279 String subRoot = _root + "/" + "backToBackRemoveAndSet";
280 List<String> subscribedPaths = new ArrayList<String>();
281 subscribedPaths.add(subRoot);
282 ZkHelixPropertyStore<ZNRecord> store =
283 new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
284 subRoot,
285 subscribedPaths);
286
287 store.set("/child0", new ZNRecord("child0"), AccessOption.PERSISTENT);
288
289 ZNRecord record = store.get("/child0", null, 0);
290 Assert.assertEquals(record.getId(), "child0");
291
292
293 String child0Path = subRoot + "/child0";
294 for (int i = 0; i < 2; i++)
295 {
296 _gZkClient.delete(child0Path);
297 _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new-" + i));
298 }
299
300 Thread.sleep(500);
301 record = store.get("/child0", null, 0);
302 Assert.assertEquals(record.getId(),
303 "child0-new-1",
304 "Cache shoulde be updated to latest create");
305
306
307 _gZkClient.delete(child0Path);
308 Thread.sleep(500);
309 try
310 {
311 record = store.get("/child0", null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
312 Assert.fail("/child0 should have been removed");
313 }
314 catch (ZkNoNodeException e)
315 {
316
317 }
318
319
320 store.stop();
321 System.out.println("END testBackToBackRemoveAndSet() at "
322 + new Date(System.currentTimeMillis()));
323 }
324
325 private String getNodeId(int i, int j)
326 {
327 return "childNode_" + i + "_" + j;
328 }
329
330 private String getSecondLevelKey(int i, int j)
331 {
332 return "/node_" + i + "/" + getNodeId(i, j);
333 }
334
335 private String getFirstLevelKey(int i)
336 {
337 return "/node_" + i;
338 }
339
340 private void setNodes(ZkHelixPropertyStore<ZNRecord> store,
341 char c,
342 boolean needTimestamp)
343 {
344 char[] data = new char[bufSize];
345
346 for (int i = 0; i < bufSize; i++)
347 {
348 data[i] = c;
349 }
350
351 Map<String, String> map = new TreeMap<String, String>();
352 for (int i = 0; i < mapNr; i++)
353 {
354 map.put("key_" + i, new String(data));
355 }
356
357 for (int i = 0; i < firstLevelNr; i++)
358 {
359 for (int j = 0; j < secondLevelNr; j++)
360 {
361 String nodeId = getNodeId(i, j);
362 ZNRecord record = new ZNRecord(nodeId);
363 record.setSimpleFields(map);
364 if (needTimestamp)
365 {
366 long now = System.currentTimeMillis();
367 record.setSimpleField("SetTimestamp", Long.toString(now));
368 }
369 String key = getSecondLevelKey(i, j);
370 store.set(key, record, AccessOption.PERSISTENT);
371 }
372 }
373 }
374
375 private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp)
376 {
377 char[] data = new char[bufSize];
378
379 for (int i = 0; i < bufSize; i++)
380 {
381 data[i] = c;
382 }
383
384 Map<String, String> map = new TreeMap<String, String>();
385 for (int i = 0; i < mapNr; i++)
386 {
387 map.put("key_" + i, new String(data));
388 }
389
390 for (int i = 0; i < firstLevelNr; i++)
391 {
392 String firstLevelKey = getFirstLevelKey(i);
393
394 for (int j = 0; j < secondLevelNr; j++)
395 {
396 String nodeId = getNodeId(i, j);
397 ZNRecord record = new ZNRecord(nodeId);
398 record.setSimpleFields(map);
399 if (needTimestamp)
400 {
401 long now = System.currentTimeMillis();
402 record.setSimpleField("SetTimestamp", Long.toString(now));
403 }
404 String key = getSecondLevelKey(i, j);
405 try
406 {
407 zkClient.writeData(root + key, record);
408 }
409 catch (ZkNoNodeException e)
410 {
411 zkClient.createPersistent(root + firstLevelKey, true);
412 zkClient.createPersistent(root + key, record);
413 }
414 }
415 }
416 }
417
418 }