View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.InputStreamReader;
24  import java.io.PrintWriter;
25  import java.net.Socket;
26  import java.util.*;
27  import java.util.concurrent.BlockingQueue;
28  import java.util.concurrent.CountDownLatch;
29  
30  import org.I0Itec.zkclient.IZkChildListener;
31  import org.I0Itec.zkclient.IZkDataListener;
32  import org.I0Itec.zkclient.IZkStateListener;
33  import org.I0Itec.zkclient.ZkConnection;
34  import org.apache.helix.InstanceType;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.PropertyKey.Builder;
37  import org.apache.helix.manager.zk.CallbackHandler;
38  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
39  import org.apache.helix.manager.zk.ZKHelixManager;
40  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41  import org.apache.helix.manager.zk.ZkClient;
42  import org.apache.helix.model.ExternalView;
43  import org.apache.log4j.Logger;
44  import org.apache.zookeeper.WatchedEvent;
45  import org.apache.zookeeper.Watcher;
46  import org.apache.zookeeper.Watcher.Event.KeeperState;
47  import org.apache.zookeeper.ZooKeeper;
48  import org.apache.zookeeper.ZooKeeper.States;
49  
50  
51  public class ZkTestHelper
52  {
53    private static Logger LOG = Logger.getLogger(ZkTestHelper.class);
54  
55    static
56    {
57      // Logger.getRootLogger().setLevel(Level.DEBUG);
58    }
59    
60    public static void disconnectSession(final ZkClient zkClient) throws Exception
61    {
62      IZkStateListener listener = new IZkStateListener()
63      {
64        @Override
65        public void handleStateChanged(KeeperState state) throws Exception
66        {
67  //         System.err.println("disconnectSession handleStateChanged. state: " + state);
68        }
69  
70        @Override
71        public void handleNewSession() throws Exception
72        {
73          // make sure zkclient is connected again
74          zkClient.waitUntilConnected();
75  
76          ZkConnection connection = ((ZkConnection) zkClient.getConnection());
77          ZooKeeper curZookeeper = connection.getZookeeper();
78  
79          LOG.info("handleNewSession. sessionId: "
80              + Long.toHexString(curZookeeper.getSessionId()));
81        }
82      };
83  
84      zkClient.subscribeStateChanges(listener);
85      ZkConnection connection = ((ZkConnection) zkClient.getConnection());
86      ZooKeeper curZookeeper = connection.getZookeeper();
87      LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
88  
89      Watcher watcher = new Watcher()
90      {
91        @Override
92        public void process(WatchedEvent event)
93        {
94          LOG.info("Process watchEvent: " + event);
95        }
96      };
97  
98      final ZooKeeper dupZookeeper =
99          new ZooKeeper(connection.getServers(),
100                       curZookeeper.getSessionTimeout(),
101                       watcher,
102                       curZookeeper.getSessionId(),
103                       curZookeeper.getSessionPasswd());
104     // wait until connected, then close
105     while (dupZookeeper.getState() != States.CONNECTED)
106     {
107       Thread.sleep(10);
108     }
109     dupZookeeper.close();
110 
111     connection = (ZkConnection) zkClient.getConnection();
112     curZookeeper = connection.getZookeeper();
113     zkClient.unsubscribeStateChanges(listener);
114 
115     // System.err.println("zk: " + oldZookeeper);
116     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
117   }
118 
119   public static void expireSession(final ZkClient zkClient) throws Exception
120   {
121     final CountDownLatch waitExpire = new CountDownLatch(1);
122 
123     IZkStateListener listener = new IZkStateListener()
124     {
125       @Override
126       public void handleStateChanged(KeeperState state) throws Exception
127       {
128 //         System.err.println("handleStateChanged. state: " + state);
129       }
130 
131       @Override
132       public void handleNewSession() throws Exception
133       {
134         // make sure zkclient is connected again
135         zkClient.waitUntilConnected();
136 
137         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
138         ZooKeeper curZookeeper = connection.getZookeeper();
139 
140         LOG.info("handleNewSession. sessionId: "
141             + Long.toHexString(curZookeeper.getSessionId()));
142         waitExpire.countDown();
143       }
144     };
145 
146     zkClient.subscribeStateChanges(listener);
147 
148     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
149     ZooKeeper curZookeeper = connection.getZookeeper();
150     LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
151 
152     Watcher watcher = new Watcher()
153     {
154       @Override
155       public void process(WatchedEvent event)
156       {
157         LOG.info("Process watchEvent: " + event);
158       }
159     };
160 
161     final ZooKeeper dupZookeeper =
162         new ZooKeeper(connection.getServers(),
163                       curZookeeper.getSessionTimeout(),
164                       watcher,
165                       curZookeeper.getSessionId(),
166                       curZookeeper.getSessionPasswd());
167     // wait until connected, then close
168     while (dupZookeeper.getState() != States.CONNECTED)
169     {
170       Thread.sleep(10);
171     }
172     dupZookeeper.close();
173 
174     // make sure session expiry really happens
175     waitExpire.await();
176     zkClient.unsubscribeStateChanges(listener);
177 
178     connection = (ZkConnection) zkClient.getConnection();
179     curZookeeper = connection.getZookeeper();
180 
181     // System.err.println("zk: " + oldZookeeper);
182     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
183   }
184 
185   /*
186    * stateMap: partition->instance->state
187    */
188   public static boolean verifyState(ZkClient zkclient,
189                                     String clusterName,
190                                     String resourceName,
191                                     Map<String, Map<String, String>> expectStateMap,
192                                     String op)
193   {
194     boolean result = true;
195     ZkBaseDataAccessor<ZNRecord> baseAccessor =
196         new ZkBaseDataAccessor<ZNRecord>(zkclient);
197     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
198     Builder keyBuilder = accessor.keyBuilder();
199 
200     ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
201     Map<String, Map<String, String>> actualStateMap = extView.getRecord().getMapFields();
202     for (String partition : actualStateMap.keySet())
203     {
204       for (String expectPartiton : expectStateMap.keySet())
205       {
206         if (!partition.matches(expectPartiton))
207         {
208           continue;
209         }
210 
211         Map<String, String> actualInstanceStateMap = actualStateMap.get(partition);
212         Map<String, String> expectInstanceStateMap = expectStateMap.get(expectPartiton);
213         for (String instance : actualInstanceStateMap.keySet())
214         {
215           for (String expectInstance : expectStateMap.get(expectPartiton).keySet())
216           {
217             if (!instance.matches(expectInstance))
218             {
219               continue;
220             }
221 
222             String actualState = actualInstanceStateMap.get(instance);
223             String expectState = expectInstanceStateMap.get(expectInstance);
224             boolean equals = expectState.equals(actualState);
225             if (op.equals("==") && !equals || op.equals("!=") && equals)
226             {
227               System.out.println(partition + "/" + instance
228                   + " state mismatch. actual state: " + actualState + ", but expect: "
229                   + expectState + ", op: " + op);
230               result = false;
231             }
232           }
233         }
234       }
235     }
236     return result;
237   }
238   
239   /**
240    * return the number of listeners on given zk-path
241    * @param zkAddr
242    * @param path
243    * @return
244    * @throws Exception
245    */
246   public static int numberOfListeners(String zkAddr, String path) throws Exception
247   {
248 	  Map<String, Set<String>> listenerMap = getListenersByZkPath(zkAddr);
249 	  if (listenerMap.containsKey(path)) {
250 		  return listenerMap.get(path).size();
251 	  }
252 	  return 0;
253   }
254   
255   /**
256    * return a map from zk-path to a set of zk-session-id that put watches on the zk-path
257    * 
258    * @param zkAddr
259    * @return
260    * @throws Exception
261    */
262   public static Map<String, Set<String>> getListenersByZkPath(String zkAddr) throws Exception
263   {
264     String splits[] = zkAddr.split(":");
265     Map<String, Set<String>> listenerMap = new TreeMap<String, Set<String>>();
266     Socket sock = null;
267     int retry = 5;
268     
269     while (retry > 0) {
270       try {
271         sock = new Socket(splits[0], Integer.parseInt(splits[1]));
272         PrintWriter out = new PrintWriter(sock.getOutputStream(), true);
273         BufferedReader in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
274     
275         out.println("wchp");
276     
277         listenerMap.clear();
278         String lastPath = null;
279         String line = in.readLine();
280         while (line != null)
281         {
282         	line = line.trim();
283         	
284         	if (line.startsWith("/")) {
285         		lastPath = line;
286         		if (!listenerMap.containsKey(lastPath)) {
287         			listenerMap.put(lastPath, new TreeSet<String>());
288         		}
289         	} else if (line.startsWith("0x")) {
290         		if (lastPath != null && listenerMap.containsKey(lastPath) ) {
291         			listenerMap.get(lastPath).add(line);
292         		} else
293         		{
294         			LOG.error("Not path associated with listener sessionId: " + line + ", lastPath: " + lastPath);
295         		}
296         	} else
297         	{
298     //    		LOG.error("unrecognized line: " + line);
299         	}
300           line = in.readLine();
301         }
302         break;
303       } catch (Exception e) {
304     	  // sometimes in test, we see connection-reset exceptions when in.readLine()
305     	  // so add this retry logic
306     	  retry--;
307       } finally
308       {
309     	if (sock != null)
310     		sock.close();
311       }
312     }
313     return listenerMap;
314   }
315 
316   /**
317    * return a map from session-id to a set of zk-path that the session has watches on
318    * 
319    * @return
320    */
321   public static Map<String, Set<String>> getListenersBySession(String zkAddr) throws Exception {
322 	  Map<String, Set<String>> listenerMapByInstance = getListenersByZkPath(zkAddr);
323 	  
324 	  // convert to index by sessionId
325 	  Map<String, Set<String>> listenerMapBySession = new TreeMap<String, Set<String>>();
326 	  for (String path : listenerMapByInstance.keySet()) {
327 		  for (String sessionId : listenerMapByInstance.get(path)) {
328 			  if (!listenerMapBySession.containsKey(sessionId)) {
329 				  listenerMapBySession.put(sessionId, new TreeSet<String>());
330 			  }
331 			  listenerMapBySession.get(sessionId).add(path);
332 		  }
333 	  }
334 
335 	  return listenerMapBySession;
336   }
337     static java.lang.reflect.Field getField(Class clazz, String fieldName) throws NoSuchFieldException {
338         try {
339             return clazz.getDeclaredField(fieldName);
340         } catch (NoSuchFieldException e) {
341             Class superClass = clazz.getSuperclass();
342             if (superClass == null) {
343                 throw e;
344             } else {
345                 return getField(superClass, fieldName);
346             }
347         }
348     }
349 
350     public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
351         Map<String, List<String>> lists = new HashMap<String, List<String>>();
352         ZkConnection connection = ((ZkConnection) client.getConnection());
353         ZooKeeper zk = connection.getZookeeper();
354 
355         java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
356         field.setAccessible(true);
357         Object watchManager = field.get(zk);
358 
359         java.lang.reflect.Field field2 = getField(watchManager.getClass(), "dataWatches");
360         field2.setAccessible(true);
361         HashMap<String, Set<Watcher>> dataWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
362 
363         field2 = getField(watchManager.getClass(), "existWatches");
364         field2.setAccessible(true);
365         HashMap<String, Set<Watcher>> existWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
366 
367         field2 = getField(watchManager.getClass(), "childWatches");
368         field2.setAccessible(true);
369         HashMap<String, Set<Watcher>> childWatches = (HashMap<String, Set<Watcher>>) field2.get(watchManager);
370 
371         lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
372         lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
373         lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
374 
375         return lists;
376     }
377 
378     public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client) throws Exception {
379         java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
380         field.setAccessible(true);
381         Map<String, Set<IZkDataListener>> dataListener = (Map<String, Set<IZkDataListener>>)field.get(client);
382         return dataListener;
383     }
384 
385     public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client) throws Exception {
386         java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
387         field.setAccessible(true);
388         Map<String, Set<IZkChildListener>> childListener = (Map<String, Set<IZkChildListener>>)field.get(client);
389         return childListener;
390     }
391 
392 
393     public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
394         java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
395         field.setAccessible(true);
396         Object eventThread = field.get(zkclient);
397         // System.out.println("field: " + eventThread);
398 
399         java.lang.reflect.Field field2 = getField(eventThread.getClass(), "_events");
400         field2.setAccessible(true);
401         BlockingQueue queue = (BlockingQueue) field2.get(eventThread);
402         // System.out.println("field2: " + queue + ", " + queue.size());
403 
404 
405         if (queue == null) {
406             LOG.error("fail to get event-queue from zkclient. skip waiting");
407             return false;
408         }
409 
410         for (int i = 0; i < 20; i++) {
411             if (queue.size() == 0) {
412                 return true;
413             }
414             Thread.sleep(100);
415             System.out.println("pending zk-events in queue: " + queue);
416         }
417         return false;
418     }
419 }