1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25
26 import org.I0Itec.zkclient.DataUpdater;
27 import org.apache.helix.AccessOption;
28 import org.apache.helix.BaseDataAccessor;
29 import org.apache.helix.PropertyPathConfig;
30 import org.apache.helix.PropertyType;
31 import org.apache.helix.ZNRecord;
32 import org.apache.helix.ZNRecordUpdater;
33 import org.apache.helix.ZkUnitTestBase;
34 import org.apache.helix.manager.zk.ZNRecordSerializer;
35 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36 import org.apache.helix.manager.zk.ZkClient;
37 import org.apache.zookeeper.data.Stat;
38 import org.testng.Assert;
39 import org.testng.annotations.Test;
40
41
42 public class TestZkBaseDataAccessor extends ZkUnitTestBase
43 {
44 @Test
45 public void testSyncZkBaseDataAccessor()
46 {
47 System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
48
49 String root = "TestZkBaseDataAccessor_syn";
50 ZkClient zkClient = new ZkClient(ZK_ADDR);
51 zkClient.setZkSerializer(new ZNRecordSerializer());
52 zkClient.deleteRecursive("/" + root);
53
54 BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
55
56
57 for (int i = 0; i < 10; i++)
58 {
59 String msgId = "msg_" + i;
60 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
61 boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
62 Assert.assertTrue(success, "Should succeed in create");
63 }
64
65
66 for (int i = 0; i < 10; i++)
67 {
68 String msgId = "msg_" + i;
69 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
70 ZNRecord record = zkClient.readData(path);
71 Assert.assertEquals(record.getId(), msgId, "Should get what we created");
72 }
73
74
75 for (int i = 0; i < 10; i++)
76 {
77 String msgId = "msg_" + i;
78 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
79 ZNRecord newRecord = new ZNRecord(msgId);
80 newRecord.setSimpleField("key1", "value1");
81 boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
82 Assert.assertTrue(success, "Should succeed in set");
83 }
84
85
86 for (int i = 0; i < 10; i++)
87 {
88 String msgId = "msg_" + i;
89 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
90 ZNRecord record = zkClient.readData(path);
91 Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
92 Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
93 }
94
95
96 for (int i = 0; i < 10; i++)
97 {
98 String msgId = "msg_" + i;
99 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
100 ZNRecord newRecord = new ZNRecord(msgId);
101 newRecord.setSimpleField("key2", "value2");
102 boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
103 Assert.assertTrue(success, "Should succeed in update");
104 }
105
106
107 for (int i = 0; i < 10; i++)
108 {
109 String msgId = "msg_" + i;
110 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
111 ZNRecord record = zkClient.readData(path);
112 Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
113 Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
114 }
115
116
117 for (int i = 0; i < 10; i++)
118 {
119 String msgId = "msg_" + i;
120 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
121 ZNRecord record = accessor.get(path, null, 0);
122 Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
123 Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
124 Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
125 }
126
127
128 for (int i = 0; i < 10; i++)
129 {
130 String msgId = "msg_" + i;
131 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
132 boolean exists = accessor.exists(path, 0);
133 Assert.assertTrue(exists, "Should exist");
134 }
135
136
137 for (int i = 0; i < 10; i++)
138 {
139 String msgId = "msg_" + i;
140 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
141 Stat stat = accessor.getStat(path, 0);
142 Assert.assertNotNull(stat, "Stat should exist");
143 Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1");
144 }
145
146
147 for (int i = 0; i < 10; i++)
148 {
149 String msgId = "msg_" + i;
150 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
151 boolean success = accessor.remove(path, 0);
152 Assert.assertTrue(success, "Should remove");
153 }
154
155
156 for (int i = 0; i < 10; i++)
157 {
158 String msgId = "msg_" + i;
159 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
160 boolean exists = zkClient.exists(path);
161 Assert.assertFalse(exists, "Should be removed");
162 }
163
164 zkClient.close();
165 System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
166 }
167
168 @Test
169 public void testAsyncZkBaseDataAccessor()
170 {
171 System.out.println("START TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
172
173 String root = "TestZkBaseDataAccessor_asyn";
174 ZkClient zkClient = new ZkClient(ZK_ADDR);
175 zkClient.setZkSerializer(new ZNRecordSerializer());
176 zkClient.deleteRecursive("/" + root);
177
178 ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
179
180
181 String parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
182 List<ZNRecord> records = new ArrayList<ZNRecord>();
183 List<String> paths = new ArrayList<String>();
184 for (int i = 0; i < 10; i++)
185 {
186 String msgId = "msg_" + i;
187 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
188 records.add(new ZNRecord(msgId));
189 }
190 boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
191 for (int i = 0; i < 10; i++)
192 {
193 String msgId = "msg_" + i;
194 Assert.assertTrue(success[i], "Should succeed in create " + msgId);
195 }
196
197
198 for (int i = 0; i < 10; i++)
199 {
200 String msgId = "msg_" + i;
201 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
202 ZNRecord record = zkClient.readData(path);
203 Assert.assertEquals(record.getId(), msgId, "Should get what we created");
204 }
205
206
207 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
208 records = new ArrayList<ZNRecord>();
209 paths = new ArrayList<String>();
210 for (int i = 0; i < 10; i++)
211 {
212 String msgId = "msg_" + i;
213 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
214 ZNRecord newRecord = new ZNRecord(msgId);
215 newRecord.setSimpleField("key1", "value1");
216 records.add(newRecord);
217 }
218 success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
219 for (int i = 0; i < 10; i++)
220 {
221 String msgId = "msg_" + i;
222 Assert.assertTrue(success[i], "Should succeed in set " + msgId);
223 }
224
225
226 for (int i = 0; i < 10; i++)
227 {
228 String msgId = "msg_" + i;
229 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
230 ZNRecord record = zkClient.readData(path);
231 Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
232 Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
233 }
234
235
236 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
237
238 List<DataUpdater<ZNRecord>> znrecordUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
239 paths = new ArrayList<String>();
240 for (int i = 0; i < 10; i++)
241 {
242 String msgId = "msg_" + i;
243 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
244 ZNRecord newRecord = new ZNRecord(msgId);
245 newRecord.setSimpleField("key2", "value2");
246
247 znrecordUpdaters.add(new ZNRecordUpdater(newRecord));
248 }
249 success = accessor.updateChildren(paths, znrecordUpdaters, AccessOption.PERSISTENT);
250 for (int i = 0; i < 10; i++)
251 {
252 String msgId = "msg_" + i;
253 Assert.assertTrue(success[i], "Should succeed in update " + msgId);
254 }
255
256
257 for (int i = 0; i < 10; i++)
258 {
259 String msgId = "msg_" + i;
260 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
261 ZNRecord record = zkClient.readData(path);
262 Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
263 Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
264 }
265
266
267 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
268 records = accessor.getChildren(parentPath, null, 0);
269 for (int i = 0; i < 10; i++)
270 {
271 String msgId = "msg_" + i;
272 ZNRecord record = records.get(i);
273 Assert.assertEquals(record.getId(), msgId, "Should get what we updated");
274 Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
275 Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
276 }
277
278
279 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
280 paths = new ArrayList<String>();
281 for (int i = 0; i < 10; i++)
282 {
283 String msgId = "msg_" + i;
284 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
285 }
286 boolean[] exists = accessor.exists(paths, 0);
287 for (int i = 0; i < 10; i++)
288 {
289 String msgId = "msg_" + i;
290 Assert.assertTrue(exists[i], "Should exist " + msgId);
291 }
292
293
294 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
295 paths = new ArrayList<String>();
296 for (int i = 0; i < 10; i++)
297 {
298 String msgId = "msg_" + i;
299 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
300 }
301 Stat[] stats = accessor.getStats(paths, 0);
302 for (int i = 0; i < 10; i++)
303 {
304 String msgId = "msg_" + i;
305 Assert.assertNotNull(stats[i], "Stat should exist for " + msgId);
306 Assert.assertEquals(stats[i].getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1 for " + msgId);
307 }
308
309
310 parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
311 paths = new ArrayList<String>();
312 for (int i = 0; i < 10; i++)
313 {
314 String msgId = "msg_" + i;
315 paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
316 }
317 success = accessor.remove(paths, 0);
318 for (int i = 0; i < 10; i++)
319 {
320 String msgId = "msg_" + i;
321 Assert.assertTrue(success[i], "Should succeed in remove " + msgId);
322 }
323
324
325 for (int i = 0; i < 10; i++)
326 {
327 String msgId = "msg_" + i;
328 String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
329 boolean pathExists = zkClient.exists(path);
330 Assert.assertFalse(pathExists, "Should be removed " + msgId);
331 }
332
333 zkClient.close();
334 System.out.println("END TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
335 }
336
337 }