1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Date;
25 import java.util.List;
26 import java.util.concurrent.Callable;
27
28 import org.I0Itec.zkclient.DataUpdater;
29 import org.apache.helix.AccessOption;
30 import org.apache.helix.PropertyPathConfig;
31 import org.apache.helix.PropertyType;
32 import org.apache.helix.TestHelper;
33 import org.apache.helix.ZNRecord;
34 import org.apache.helix.ZNRecordUpdater;
35 import org.apache.helix.ZkUnitTestBase;
36 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
37 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
38 import org.testng.Assert;
39 import org.testng.annotations.Test;
40
41
42 public class TestWtCacheAsyncOpMultiThread extends ZkUnitTestBase
43 {
44 class TestCreateZkCacheBaseDataAccessor implements Callable<Boolean>
45 {
46 final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
47 final String _clusterName;
48 final int _id;
49
50 public TestCreateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
51 {
52 _accessor = accessor;
53 _clusterName = clusterName;
54 _id = id;
55 }
56
57 @Override
58 public Boolean call() throws Exception
59 {
60
61 List<String> paths = new ArrayList<String>();
62 List<ZNRecord> records = new ArrayList<ZNRecord>();
63 for (int j = 0; j < 2; j++)
64 {
65 paths.clear();
66 records.clear();
67
68 if (_id == 1 && j == 0)
69 {
70
71 Thread.sleep(30);
72 }
73
74 if (_id == 0 && j == 1)
75 {
76
77 Thread.sleep(100);
78 }
79
80
81 for (int i = 0; i < 5; i++)
82 {
83 int k = j * 5 + i;
84 String path =
85 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
86 _clusterName,
87 "localhost_8901",
88 "session_0",
89 "TestDB" + k);
90 ZNRecord record = new ZNRecord("TestDB" + k);
91
92 paths.add(path);
93 records.add(record);
94 }
95
96 boolean[] success = _accessor.createChildren(paths, records, AccessOption.PERSISTENT);
97
98
99
100 for (int i = 1; i < 5; i++)
101 {
102 Assert.assertEquals(success[i], success[0], "Should be either all succeed of all fail");
103 }
104 }
105
106 return true;
107 }
108 }
109
110 class TestUpdateZkCacheBaseDataAccessor implements Callable<Boolean>
111 {
112 final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
113 final String _clusterName;
114 final int _id;
115
116 public TestUpdateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
117 {
118 _accessor = accessor;
119 _clusterName = clusterName;
120 _id = id;
121 }
122
123 @Override
124 public Boolean call() throws Exception
125 {
126
127 List<String> paths = new ArrayList<String>();
128 List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
129 for (int j = 0; j < 10; j++)
130 {
131 paths.clear();
132 updaters.clear();
133
134 for (int i = 0; i < 10; i++)
135 {
136 String path =
137 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
138 _clusterName,
139 "localhost_8901",
140 "session_0",
141 "TestDB" + i);
142
143 ZNRecord newRecord = new ZNRecord("TestDB" + i);
144 newRecord.setSimpleField("" + j, "" + j);
145 DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
146 paths.add(path);
147 updaters.add(updater);
148 }
149
150 boolean[] success = _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
151
152
153 for (int i = 0; i < 10; i++)
154 {
155 Assert.assertEquals(success[i], true, "Should be all succeed");
156 }
157 }
158
159 return true;
160 }
161 }
162
163 class TestSetZkCacheBaseDataAccessor implements Callable<Boolean>
164 {
165 final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
166 final String _clusterName;
167 final int _id;
168
169 public TestSetZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
170 {
171 _accessor = accessor;
172 _clusterName = clusterName;
173 _id = id;
174 }
175
176 @Override
177 public Boolean call() throws Exception
178 {
179
180 List<String> paths = new ArrayList<String>();
181 List<ZNRecord> records = new ArrayList<ZNRecord>();
182 for (int j = 0; j < 2; j++)
183 {
184 paths.clear();
185 records.clear();
186
187 if (_id == 1 && j == 0)
188 {
189
190 Thread.sleep(30);
191 }
192
193 if (_id == 0 && j == 1)
194 {
195
196 Thread.sleep(100);
197 }
198
199 for (int i = 0; i < 5; i++)
200 {
201 int k = j * 5 + i;
202 String path =
203 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName, "TestDB" + k);
204 ZNRecord record = new ZNRecord("TestDB" + k);
205
206 paths.add(path);
207 records.add(record);
208 }
209 boolean[] success = _accessor.setChildren(paths, records, AccessOption.PERSISTENT);
210
211
212 for (int i = 0; i < 5; i++)
213 {
214 Assert.assertEquals(success[i], true);
215 }
216 }
217
218 return true;
219 }
220 }
221
222 @Test
223 public void testHappyPathZkCacheBaseDataAccessor()
224 {
225 String className = TestHelper.getTestClassName();
226 String methodName = TestHelper.getTestMethodName();
227 String clusterName = className + "_" + methodName;
228 System.out.println("START " + clusterName + " at "
229 + new Date(System.currentTimeMillis()));
230
231
232 String curStatePath =
233 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
234 clusterName,
235 "localhost_8901");
236 String extViewPath =
237 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
238
239 ZkBaseDataAccessor<ZNRecord> baseAccessor =
240 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
241
242 baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
243
244 List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
245 ZkCacheBaseDataAccessor<ZNRecord> accessor =
246 new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
247 null,
248 cachePaths,
249 null);
250
251
252 boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
253 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
254
255
256 List<Callable<Boolean>> threads = new ArrayList<Callable<Boolean>>();
257 for (int i = 0; i < 2; i++)
258 {
259 threads.add(new TestCreateZkCacheBaseDataAccessor(accessor, clusterName, i));
260 }
261 TestHelper.startThreadsConcurrently(threads, 1000);
262
263
264
265 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
266 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
267
268
269 threads.clear();
270 for (int i = 0; i < 2; i++)
271 {
272 threads.add(new TestUpdateZkCacheBaseDataAccessor(accessor, clusterName, i));
273 }
274 TestHelper.startThreadsConcurrently(threads, 1000);
275
276
277
278 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
279 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
280
281
282 threads.clear();
283 for (int i = 0; i < 2; i++)
284 {
285 threads.add(new TestSetZkCacheBaseDataAccessor(accessor, clusterName, i));
286 }
287 TestHelper.startThreadsConcurrently(threads, 1000);
288
289
290
291 ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
292 Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
293
294 System.out.println("END " + clusterName + " at "
295 + new Date(System.currentTimeMillis()));
296 }
297 }