1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.InputStreamReader;
24 import java.io.PrintWriter;
25 import java.net.Socket;
26 import java.util.*;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.CountDownLatch;
29
30 import org.I0Itec.zkclient.IZkChildListener;
31 import org.I0Itec.zkclient.IZkDataListener;
32 import org.I0Itec.zkclient.IZkStateListener;
33 import org.I0Itec.zkclient.ZkConnection;
34 import org.apache.helix.InstanceType;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.PropertyKey.Builder;
37 import org.apache.helix.manager.zk.CallbackHandler;
38 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
39 import org.apache.helix.manager.zk.ZKHelixManager;
40 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41 import org.apache.helix.manager.zk.ZkClient;
42 import org.apache.helix.model.ExternalView;
43 import org.apache.log4j.Logger;
44 import org.apache.zookeeper.WatchedEvent;
45 import org.apache.zookeeper.Watcher;
46 import org.apache.zookeeper.Watcher.Event.KeeperState;
47 import org.apache.zookeeper.ZooKeeper;
48 import org.apache.zookeeper.ZooKeeper.States;
49
50
51 public class ZkTestHelper
52 {
53 private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
54
55 static
56 {
57
58 }
59
60 public static void disconnectSession(final ZkClient zkClient) throws Exception
61 {
62 IZkStateListener listener = new IZkStateListener()
63 {
64 @Override
65 public void handleStateChanged(KeeperState state) throws Exception
66 {
67
68 }
69
70 @Override
71 public void handleNewSession() throws Exception
72 {
73
74 zkClient.waitUntilConnected();
75
76 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
77 ZooKeeper curZookeeper = connection.getZookeeper();
78
79 LOG.info("handleNewSession. sessionId: "
80 + Long.toHexString(curZookeeper.getSessionId()));
81 }
82 };
83
84 zkClient.subscribeStateChanges(listener);
85 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
86 ZooKeeper curZookeeper = connection.getZookeeper();
87 LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
88
89 Watcher watcher = new Watcher()
90 {
91 @Override
92 public void process(WatchedEvent event)
93 {
94 LOG.info("Process watchEvent: " + event);
95 }
96 };
97
98 final ZooKeeper dupZookeeper =
99 new ZooKeeper(connection.getServers(),
100 curZookeeper.getSessionTimeout(),
101 watcher,
102 curZookeeper.getSessionId(),
103 curZookeeper.getSessionPasswd());
104
105 while (dupZookeeper.getState() != States.CONNECTED)
106 {
107 Thread.sleep(10);
108 }
109 dupZookeeper.close();
110
111 connection = (ZkConnection) zkClient.getConnection();
112 curZookeeper = connection.getZookeeper();
113 zkClient.unsubscribeStateChanges(listener);
114
115
116 LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
117 }
118
119 public static void expireSession(final ZkClient zkClient) throws Exception
120 {
121 final CountDownLatch waitExpire = new CountDownLatch(1);
122
123 IZkStateListener listener = new IZkStateListener()
124 {
125 @Override
126 public void handleStateChanged(KeeperState state) throws Exception
127 {
128
129 }
130
131 @Override
132 public void handleNewSession() throws Exception
133 {
134
135 zkClient.waitUntilConnected();
136
137 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
138 ZooKeeper curZookeeper = connection.getZookeeper();
139
140 LOG.info("handleNewSession. sessionId: "
141 + Long.toHexString(curZookeeper.getSessionId()));
142 waitExpire.countDown();
143 }
144 };
145
146 zkClient.subscribeStateChanges(listener);
147
148 ZkConnection connection = ((ZkConnection) zkClient.getConnection());
149 ZooKeeper curZookeeper = connection.getZookeeper();
150 LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
151
152 Watcher watcher = new Watcher()
153 {
154 @Override
155 public void process(WatchedEvent event)
156 {
157 LOG.info("Process watchEvent: " + event);
158 }
159 };
160
161 final ZooKeeper dupZookeeper =
162 new ZooKeeper(connection.getServers(),
163 curZookeeper.getSessionTimeout(),
164 watcher,
165 curZookeeper.getSessionId(),
166 curZookeeper.getSessionPasswd());
167
168 while (dupZookeeper.getState() != States.CONNECTED)
169 {
170 Thread.sleep(10);
171 }
172 dupZookeeper.close();
173
174
175 waitExpire.await();
176 zkClient.unsubscribeStateChanges(listener);
177
178 connection = (ZkConnection) zkClient.getConnection();
179 curZookeeper = connection.getZookeeper();
180
181
182 LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
183 }
184
185
186
187
188 public static boolean verifyState(ZkClient zkclient,
189 String clusterName,
190 String resourceName,
191 Map<String, Map<String, String>> expectStateMap,
192 String op)
193 {
194 boolean result = true;
195 ZkBaseDataAccessor<ZNRecord> baseAccessor =
196 new ZkBaseDataAccessor<ZNRecord>(zkclient);
197 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
198 Builder keyBuilder = accessor.keyBuilder();
199
200 ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
201 Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
202 for (String partition : actualStateMap.keySet())
203 {
204 for (String expectPartiton : expectStateMap.keySet())
205 {
206 if (!partition.matches(expectPartiton))
207 {
208 continue;
209 }
210
211 Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
212 Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
213 for (String instance : actualInstanceStateMap.keySet())
214 {
215 for (String expectInstance : expectStateMap.get(expectPartiton).keySet())
216 {
217 if (!instance.matches(expectInstance))
218 {
219 continue;
220 }
221
222 String actualState = actualInstanceStateMap.get(instance);
223 String expectState = expectInstanceStateMap.get(expectInstance);
224 boolean equals = expectState.equals(actualState);
225 if (op.equals("==") && !equals || op.equals("!=") && equals)
226 {
227 System.out.println(partition + "/" + instance
228 + " state mismatch. actual state: " + actualState + ", but expect: "
229 + expectState + ", op: " + op);
230 result = false;
231 }
232 }
233 }
234 }
235 }
236 return result;
237 }
238
239
240
241
242
243
244
245
246 public static int numberOfListeners(String zkAddr, String path) throws Exception
247 {
248 Map<String, Set<String>> listenerMap = getListenersByZkPath(zkAddr);
249 if (listenerMap.containsKey(path)) {
250 return listenerMap.get(path).size();
251 }
252 return 0;
253 }
254
255
256
257
258
259
260
261
262 public static Map<String, Set<String>> getListenersByZkPath(String zkAddr) throws Exception
263 {
264 String splits[] = zkAddr.split(":");
265 Map<String, Set<String>> listenerMap = new TreeMap<String, Set<String>>();
266 Socket sock = null;
267 int retry = 5;
268
269 while (retry > 0) {
270 try {
271 sock = new Socket(splits[0], Integer.parseInt(splits[1]));
272 PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
273 BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
274
275 out.println("wchp");
276
277 listenerMap.clear();
278 String lastPath = null;
279 String line = in.readLine();
280 while (line != null)
281 {
282 line = line.trim();
283
284 if (line.startsWith("/")) {
285 lastPath = line;
286 if (!listenerMap.containsKey(lastPath)) {
287 listenerMap.put(lastPath, new TreeSet<String>());
288 }
289 } else if (line.startsWith("0x")) {
290 if (lastPath != null && listenerMap.containsKey(lastPath) ) {
291 listenerMap.get(lastPath).add(line);
292 } else
293 {
294 LOG.error("Not path associated with listener sessionId: " + line + ", lastPath: " + lastPath);
295 }
296 } else
297 {
298
299 }
300 line = in.readLine();
301 }
302 break;
303 } catch (Exception e) {
304
305
306 retry--;
307 } finally
308 {
309 if (sock != null)
310 sock.close();
311 }
312 }
313 return listenerMap;
314 }
315
316
317
318
319
320
321 public static Map<String, Set<String>> getListenersBySession(String zkAddr) throws Exception {
322 Map<String, Set<String>> listenerMapByInstance = getListenersByZkPath(zkAddr);
323
324
325 Map<String, Set<String>> listenerMapBySession = new TreeMap<String, Set<String>>();
326 for (String path : listenerMapByInstance.keySet()) {
327 for (String sessionId : listenerMapByInstance.get(path)) {
328 if (!listenerMapBySession.containsKey(sessionId)) {
329 listenerMapBySession.put(sessionId, new TreeSet<String>());
330 }
331 listenerMapBySession.get(sessionId).add(path);
332 }
333 }
334
335 return listenerMapBySession;
336 }
337 static java.lang.reflect.Field getField(Class clazz, String fieldName) throws NoSuchFieldException {
338 try {
339 return clazz.getDeclaredField(fieldName);
340 } catch (NoSuchFieldException e) {
341 Class superClass = clazz.getSuperclass();
342 if (superClass == null) {
343 throw e;
344 } else {
345 return getField(superClass, fieldName);
346 }
347 }
348 }
349
350 public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
351 Map<String, List<String>> lists = new HashMap<String, List<String>>();
352 ZkConnection connection = ((ZkConnection) client.getConnection());
353 ZooKeeper zk = connection.getZookeeper();
354
355 java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
356 field.setAccessible(true);
357 Object watchManager = field.get(zk);
358
359 java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
360 field2.setAccessible(true);
361 HashMap<String, Set<Watcher>> dataWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
362
363 field2 = getField(watchManager.getClass(), "existWatches");
364 field2.setAccessible(true);
365 HashMap<String, Set<Watcher>> existWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
366
367 field2 = getField(watchManager.getClass(), "childWatches");
368 field2.setAccessible(true);
369 HashMap<String, Set<Watcher>> childWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
370
371 lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
372 lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
373 lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
374
375 return lists;
376 }
377
378 public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client) throws Exception {
379 java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
380 field.setAccessible(true);
381 Map<String, Set<IZkDataListener>> dataListener = (Map<String, Set<IZkDataListener>>)field.get(client);
382 return dataListener;
383 }
384
385 public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client) throws Exception {
386 java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
387 field.setAccessible(true);
388 Map<String, Set<IZkChildListener>> childListener = (Map<String, Set<IZkChildListener>>)field.get(client);
389 return childListener;
390 }
391
392
393 public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
394 java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
395 field.setAccessible(true);
396 Object eventThread = field.get(zkclient);
397
398
399 java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
400 field2.setAccessible(true);
401 BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
402
403
404
405 if (queue == null) {
406 LOG.error("fail to get event-queue from zkclient. skip waiting");
407 return false;
408 }
409
410 for (int i = 0; i < 20; i++) {
411 if (queue.size() == 0) {
412 return true;
413 }
414 Thread.sleep(100);
415 System.out.println("pending zk-events in queue: " + queue);
416 }
417 return false;
418 }
419 }