1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Date;
25 import java.util.List;
26
27 import org.I0Itec.zkclient.DataUpdater;
28 import org.apache.helix.AccessOption;
29 import org.apache.helix.PropertyPathConfig;
30 import org.apache.helix.PropertyType;
31 import org.apache.helix.TestHelper;
32 import org.apache.helix.ZNRecord;
33 import org.apache.helix.ZNRecordUpdater;
34 import org.apache.helix.ZkUnitTestBase;
35 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
37 import org.testng.Assert;
38 import org.testng.annotations.Test;
39
40
41 public class TestWtCacheAsyncOpSingleThread extends ZkUnitTestBase
42 {
43 @Test
44 public void testHappyPathZkCacheBaseDataAccessor()
45 {
46 String className = TestHelper.getTestClassName();
47 String methodName = TestHelper.getTestMethodName();
48 String clusterName = className + "_" + methodName;
49 System.out.println("START " + clusterName + " at "
50 + new Date(System.currentTimeMillis()));
51
52
53 String curStatePath =
54 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
55 clusterName,
56 "localhost_8901");
57 String extViewPath =
58 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
59
60 ZkBaseDataAccessor<ZNRecord> baseAccessor =
61 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
62
63 baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
64
65 List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
66 ZkCacheBaseDataAccessor<ZNRecord> accessor =
67 new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
68 null,
69 cachePaths,
70 null);
71
72 boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
73 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
74
75
76
77 List<String> paths = new ArrayList<String>();
78 List<ZNRecord> records = new ArrayList<ZNRecord>();
79 for (int i = 0; i < 10; i++)
80 {
81 String path =
82 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
83 clusterName,
84 "localhost_8901",
85 "session_0",
86 "TestDB" + i);
87 ZNRecord record = new ZNRecord("TestDB" + i);
88
89 paths.add(path);
90 records.add(record);
91 }
92
93 boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
94 for (int i = 0; i < 10; i++)
95 {
96 Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
97 }
98
99
100
101 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
102 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
103
104
105
106 List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
107 for (int j = 0; j < 10; j++)
108 {
109 paths.clear();
110 updaters.clear();
111 for (int i = 0; i < 10; i++)
112 {
113 String path = curStatePath + "/session_0/TestDB" + i;
114 ZNRecord newRecord = new ZNRecord("TestDB" + i);
115 newRecord.setSimpleField("" + j, "" + j);
116 DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
117 paths.add(path);
118 updaters.add(updater);
119 }
120 success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
121
122 for (int i = 0; i < 10; i++)
123 {
124 Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
125 }
126 }
127
128
129
130 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
131 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
132
133
134
135 paths.clear();
136 records.clear();
137 for (int i = 0; i < 10; i++)
138 {
139 String path =
140 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
141 ZNRecord record = new ZNRecord("TestDB" + i);
142
143 paths.add(path);
144 records.add(record);
145 }
146 success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
147 for (int i = 0; i < 10; i++)
148 {
149 Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
150 }
151
152
153
154 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
155 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
156
157
158
159 paths.clear();
160 records.clear();
161 for (int i = 0; i < 10; i++)
162 {
163 String path =
164 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
165 paths.add(path);
166 }
167
168 records = accessor.get(paths, null, 0);
169 for (int i = 0; i < 10; i++)
170 {
171 Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
172 }
173
174
175 records.clear();
176 records = accessor.getChildren(extViewPath, null, 0);
177 for (int i = 0; i < 10; i++)
178 {
179 Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
180 }
181
182
183 paths.clear();
184 for (int i = 0; i < 10; i++)
185 {
186 String path =
187 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
188 clusterName,
189 "localhost_8901",
190 "session_0",
191 "TestDB" + i);
192 paths.add(path);
193 }
194 success = accessor.exists(paths, 0);
195 for (int i = 0; i < 10; i++)
196 {
197 Assert.assertTrue(success[i], "Should exits: TestDB" + i);
198 }
199
200 System.out.println("END " + clusterName + " at "
201 + new Date(System.currentTimeMillis()));
202 }
203
204 @Test
205 public void testCreateFailZkCacheBaseAccessor()
206 {
207 String className = TestHelper.getTestClassName();
208 String methodName = TestHelper.getTestMethodName();
209 String clusterName = className + "_" + methodName;
210 System.out.println("START " + clusterName + " at "
211 + new Date(System.currentTimeMillis()));
212
213
214 String curStatePath =
215 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
216 clusterName,
217 "localhost_8901");
218 String extViewPath =
219 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
220
221 ZkBaseDataAccessor<ZNRecord> baseAccessor =
222 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
223
224 baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
225
226 ZkCacheBaseDataAccessor<ZNRecord> accessor =
227 new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
228 null,
229 Arrays.asList(curStatePath, extViewPath),
230 null);
231
232 Assert.assertEquals(accessor._wtCache._cache.size(), 1, "Should contain only:\n"
233 + curStatePath);
234 Assert.assertTrue(accessor._wtCache._cache.containsKey(curStatePath));
235
236
237 List<String> paths = new ArrayList<String>();
238 List<ZNRecord> records = new ArrayList<ZNRecord>();
239 for (int i = 0; i < 10; i++)
240 {
241 String path =
242 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
243 clusterName,
244 "localhost_8901",
245 "session_1",
246 "TestDB" + i);
247 ZNRecord record = new ZNRecord("TestDB" + i);
248
249 paths.add(path);
250 records.add(record);
251 }
252
253 boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
254 for (int i = 0; i < 10; i++)
255 {
256 Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
257 }
258
259
260
261 success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
262
263 for (int i = 0; i < 10; i++)
264 {
265 Assert.assertFalse(success[i], "Should fail on create: " + paths.get(i));
266 }
267
268 System.out.println("END " + clusterName + " at "
269 + new Date(System.currentTimeMillis()));
270
271 }
272 }