1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.Date;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.TreeSet;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31
32 import org.apache.helix.AccessOption;
33 import org.apache.helix.PropertyPathConfig;
34 import org.apache.helix.PropertyType;
35 import org.apache.helix.TestHelper;
36 import org.apache.helix.ZNRecord;
37 import org.apache.helix.ZNRecordUpdater;
38 import org.apache.helix.ZkUnitTestBase;
39 import org.apache.helix.manager.zk.ZNRecordSerializer;
40 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
42 import org.apache.helix.manager.zk.ZkClient;
43 import org.apache.helix.store.HelixPropertyListener;
44 import org.testng.Assert;
45 import org.testng.annotations.Test;
46
47
48 public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase
49 {
50 class TestListener implements HelixPropertyListener
51 {
52 ConcurrentLinkedQueue<String> _deletePathQueue = new ConcurrentLinkedQueue<String>();
53 ConcurrentLinkedQueue<String> _createPathQueue = new ConcurrentLinkedQueue<String>();
54 ConcurrentLinkedQueue<String> _changePathQueue = new ConcurrentLinkedQueue<String>();
55
56 @Override
57 public void onDataDelete(String path)
58 {
59
60 _deletePathQueue.add(path);
61 }
62
63 @Override
64 public void onDataCreate(String path)
65 {
66
67 _createPathQueue.add(path);
68 }
69
70 @Override
71 public void onDataChange(String path)
72 {
73
74 _changePathQueue.add(path);
75 }
76
77 public void reset()
78 {
79 _deletePathQueue.clear();
80 _createPathQueue.clear();
81 _changePathQueue.clear();
82 }
83 }
84
85 @Test
86 public void testZkCacheCallbackExternalOpNoChroot() throws Exception
87 {
88 String className = TestHelper.getTestClassName();
89 String methodName = TestHelper.getTestMethodName();
90 String clusterName = className + "_" + methodName;
91 System.out.println("START " + clusterName + " at "
92 + new Date(System.currentTimeMillis()));
93
94
95 ZkClient zkclient = new ZkClient(ZK_ADDR);
96 zkclient.setZkSerializer(new ZNRecordSerializer());
97 ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
98 new ZkBaseDataAccessor<ZNRecord>(zkclient);
99
100
101 String curStatePath =
102 PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
103 clusterName,
104 "localhost_8901");
105 String extViewPath =
106 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
107
108 ZkBaseDataAccessor<ZNRecord> baseAccessor =
109 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
110
111 extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
112
113 List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
114 ZkCacheBaseDataAccessor<ZNRecord> accessor =
115 new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, cachePaths);
116
117
118 TestListener listener = new TestListener();
119 accessor.subscribe(curStatePath, listener);
120
121
122 List<String> createPaths = new ArrayList<String>();
123 for (int i = 0; i < 10; i++)
124 {
125 String path = curStatePath + "/session_0/TestDB" + i;
126 createPaths.add(path);
127 boolean success =
128 extBaseAccessor.create(path,
129 new ZNRecord("TestDB" + i),
130 AccessOption.PERSISTENT);
131 Assert.assertTrue(success, "Should succeed in create: " + path);
132 }
133
134 Thread.sleep(500);
135
136
137
138 boolean ret =
139 TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
140
141 Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
142 System.out.println("createCnt: " + listener._createPathQueue.size());
143 Assert.assertEquals(listener._createPathQueue.size(),
144 11,
145 "Shall get 11 onCreate callbacks.");
146
147
148 createPaths.add(curStatePath + "/session_0");
149 List<String> createCallbackPaths = new ArrayList<String>(listener._createPathQueue);
150 Collections.sort(createPaths);
151 Collections.sort(createCallbackPaths);
152
153 Assert.assertEquals(createCallbackPaths,
154 createPaths,
155 "Should get create callbacks at " + createPaths + ", but was "
156 + createCallbackPaths);
157
158
159 List<String> updatePaths = new ArrayList<String>();
160 listener.reset();
161 for (int i = 0; i < 10; i++)
162 {
163 String path = curStatePath + "/session_0/TestDB" + i;
164 for (int j = 0; j < 10; j++)
165 {
166 updatePaths.add(path);
167 ZNRecord newRecord = new ZNRecord("TestDB" + i);
168 newRecord.setSimpleField("" + j, "" + j);
169 boolean success =
170 accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
171 Assert.assertTrue(success, "Should succeed in update: " + path);
172 }
173 }
174 Thread.sleep(500);
175
176
177
178 ret =
179 TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
180
181 Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
182 System.out.println("changeCnt: " + listener._changePathQueue.size());
183 Assert.assertEquals(listener._changePathQueue.size(),
184 100,
185 "Shall get 100 onChange callbacks.");
186
187
188 List<String> updateCallbackPaths = new ArrayList<String>(listener._changePathQueue);
189 Collections.sort(updatePaths);
190 Collections.sort(updateCallbackPaths);
191 Assert.assertEquals(updateCallbackPaths,
192 updatePaths,
193 "Should get change callbacks at " + updatePaths + ", but was "
194 + updateCallbackPaths);
195
196
197 TreeSet<String> removePaths = new TreeSet<String>();
198 listener.reset();
199 for (int i = 0; i < 10; i++)
200 {
201 String path = curStatePath + "/session_0/TestDB" + i;
202 removePaths.add(path);
203 boolean success = accessor.remove(path, AccessOption.PERSISTENT);
204 Assert.assertTrue(success, "Should succeed in remove: " + path);
205 }
206 Thread.sleep(500);
207
208
209
210 ret =
211 TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
212
213 Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
214 System.out.println("deleteCnt: " + listener._deletePathQueue.size());
215 Assert.assertTrue(listener._deletePathQueue.size() >= 10,
216 "Shall get at least 10 onDelete callbacks.");
217
218
219 Set<String> removeCallbackPaths = new TreeSet<String>(listener._deletePathQueue);
220 Assert.assertEquals(removeCallbackPaths,
221 removePaths,
222 "Should get remove callbacks at " + removePaths + ", but was "
223 + removeCallbackPaths);
224
225 System.out.println("END " + clusterName + " at "
226 + new Date(System.currentTimeMillis()));
227 }
228 }