View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.I0Itec.zkclient.IZkStateListener;
28  import org.I0Itec.zkclient.ZkConnection;
29  import org.I0Itec.zkclient.ZkServer;
30  import org.apache.helix.PropertyKey.Builder;
31  import org.apache.helix.controller.pipeline.Pipeline;
32  import org.apache.helix.controller.pipeline.Stage;
33  import org.apache.helix.controller.pipeline.StageContext;
34  import org.apache.helix.controller.stages.ClusterEvent;
35  import org.apache.helix.manager.zk.ZKHelixAdmin;
36  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
37  import org.apache.helix.manager.zk.ZNRecordSerializer;
38  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39  import org.apache.helix.manager.zk.ZkClient;
40  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
41  import org.apache.helix.model.IdealState;
42  import org.apache.helix.model.InstanceConfig;
43  import org.apache.helix.model.LiveInstance;
44  import org.apache.helix.model.Message;
45  import org.apache.helix.model.StateModelDefinition;
46  import org.apache.helix.model.IdealState.IdealStateModeProperty;
47  import org.apache.helix.model.Message.Attributes;
48  import org.apache.helix.model.Message.MessageType;
49  import org.apache.helix.tools.StateModelConfigGenerator;
50  import org.apache.helix.util.HelixUtil;
51  import org.apache.log4j.Logger;
52  import org.apache.zookeeper.WatchedEvent;
53  import org.apache.zookeeper.Watcher;
54  import org.apache.zookeeper.Watcher.Event.KeeperState;
55  import org.apache.zookeeper.ZooKeeper;
56  import org.testng.Assert;
57  import org.testng.AssertJUnit;
58  import org.testng.annotations.AfterSuite;
59  import org.testng.annotations.BeforeSuite;
60  
61  
62  // TODO merge code with ZkIntegrationTestBase
63  public class ZkUnitTestBase
64  {
65    private static Logger LOG = Logger.getLogger(ZkUnitTestBase.class);
66    protected static ZkServer _zkServer = null;
67    protected static ZkClient _gZkClient;
68  
69    public static final String ZK_ADDR = "localhost:2185";
70    protected static final String CLUSTER_PREFIX = "CLUSTER";
71    protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER";
72  
73    @BeforeSuite(alwaysRun = true)
74    public void beforeSuite() throws Exception
75    {
76      _zkServer = TestHelper.startZkSever(ZK_ADDR);
77      AssertJUnit.assertTrue(_zkServer != null);
78  
79      // System.out.println("Number of open zkClient before ZkUnitTests: "
80      // + ZkClient.getNumberOfConnections());
81  
82      _gZkClient = new ZkClient(ZK_ADDR);
83      _gZkClient.setZkSerializer(new ZNRecordSerializer());
84    }
85  
86    @AfterSuite(alwaysRun = true)
87    public void afterTest()
88    {
89      _gZkClient.close();
90      TestHelper.stopZkServer(_zkServer);
91      _zkServer = null;
92  
93      // System.out.println("Number of open zkClient after ZkUnitTests: "
94      // + ZkClient.getNumberOfConnections());
95  
96    }
97  
98    protected String getShortClassName()
99    {
100     String className = this.getClass().getName();
101     return className.substring(className.lastIndexOf('.') + 1);
102   }
103 
104   protected String getCurrentLeader(ZkClient zkClient, String clusterName)
105   {
106     String leaderPath =
107         HelixUtil.getControllerPropertyPath(clusterName, PropertyType.LEADER);
108     ZNRecord leaderRecord = zkClient.<ZNRecord> readData(leaderPath);
109     if (leaderRecord == null)
110     {
111       return null;
112     }
113 
114     String leader = leaderRecord.getSimpleField(PropertyType.LEADER.toString());
115     return leader;
116   }
117 
118   protected void stopCurrentLeader(ZkClient zkClient,
119                                    String clusterName,
120                                    Map<String, Thread> threadMap,
121                                    Map<String, HelixManager> managerMap)
122   {
123     String leader = getCurrentLeader(zkClient, clusterName);
124     Assert.assertTrue(leader != null);
125     System.out.println("stop leader:" + leader + " in " + clusterName);
126     Assert.assertTrue(leader != null);
127 
128     HelixManager manager = managerMap.remove(leader);
129     Assert.assertTrue(manager != null);
130     manager.disconnect();
131 
132     Thread thread = threadMap.remove(leader);
133     Assert.assertTrue(thread != null);
134     thread.interrupt();
135 
136     boolean isNewLeaderElected = false;
137     try
138     {
139       // Thread.sleep(2000);
140       for (int i = 0; i < 5; i++)
141       {
142         Thread.sleep(1000);
143         String newLeader = getCurrentLeader(zkClient, clusterName);
144         if (!newLeader.equals(leader))
145         {
146           isNewLeaderElected = true;
147           System.out.println("new leader elected: " + newLeader + " in " + clusterName);
148           break;
149         }
150       }
151     }
152     catch (InterruptedException e)
153     {
154       e.printStackTrace();
155     }
156     if (isNewLeaderElected == false)
157     {
158       System.out.println("fail to elect a new leader elected in " + clusterName);
159     }
160     AssertJUnit.assertTrue(isNewLeaderElected);
161   }
162 
163   public void verifyInstance(ZkClient zkClient,
164                              String clusterName,
165                              String instance,
166                              boolean wantExists)
167   {
168     // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
169     String instanceConfigsPath =
170         PropertyPathConfig.getPath(PropertyType.CONFIGS,
171                                    clusterName,
172                                    ConfigScopeProperty.PARTICIPANT.toString());
173     String instanceConfigPath = instanceConfigsPath + "/" + instance;
174     String instancePath = HelixUtil.getInstancePath(clusterName, instance);
175     AssertJUnit.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
176     AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
177   }
178 
179   public void verifyResource(ZkClient zkClient,
180                              String clusterName,
181                              String resource,
182                              boolean wantExists)
183   {
184     String resourcePath = HelixUtil.getIdealStatePath(clusterName) + "/" + resource;
185     AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
186   }
187 
188   public void verifyEnabled(ZkClient zkClient,
189                             String clusterName,
190                             String instance,
191                             boolean wantEnabled)
192   {
193     ZKHelixDataAccessor accessor =
194         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
195     Builder keyBuilder = accessor.keyBuilder();
196 
197     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance));
198     AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
199   }
200 
201   public void verifyReplication(ZkClient zkClient,
202                                 String clusterName,
203                                 String resource,
204                                 int repl)
205   {
206     ZKHelixDataAccessor accessor =
207         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(zkClient));
208     Builder keyBuilder = accessor.keyBuilder();
209 
210     IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource));
211     for (String partitionName : idealState.getPartitionSet())
212     {
213       if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
214       {
215         AssertJUnit.assertEquals(repl, idealState.getPreferenceList(partitionName).size());
216       }
217       else if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
218       {
219         AssertJUnit.assertEquals(repl, idealState.getInstanceStateMap(partitionName)
220                                                  .size());
221       }
222     }
223   }
224 
225   protected void simulateSessionExpiry(ZkConnection zkConnection) throws IOException,
226       InterruptedException
227   {
228     ZooKeeper oldZookeeper = zkConnection.getZookeeper();
229     LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
230 
231     Watcher watcher = new Watcher()
232     {
233       @Override
234       public void process(WatchedEvent event)
235       {
236         LOG.info("In New connection, process event:" + event);
237       }
238     };
239 
240     ZooKeeper newZookeeper =
241         new ZooKeeper(zkConnection.getServers(),
242                       oldZookeeper.getSessionTimeout(),
243                       watcher,
244                       oldZookeeper.getSessionId(),
245                       oldZookeeper.getSessionPasswd());
246     LOG.info("New sessionId = " + newZookeeper.getSessionId());
247     // Thread.sleep(3000);
248     newZookeeper.close();
249     Thread.sleep(10000);
250     oldZookeeper = zkConnection.getZookeeper();
251     LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
252   }
253 
254   protected void simulateSessionExpiry(ZkClient zkClient) throws IOException,
255       InterruptedException
256   {
257     IZkStateListener listener = new IZkStateListener()
258     {
259       @Override
260       public void handleStateChanged(KeeperState state) throws Exception
261       {
262         LOG.info("In Old connection, state changed:" + state);
263       }
264 
265       @Override
266       public void handleNewSession() throws Exception
267       {
268         LOG.info("In Old connection, new session");
269       }
270     };
271     zkClient.subscribeStateChanges(listener);
272     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
273     ZooKeeper oldZookeeper = connection.getZookeeper();
274     LOG.info("Old sessionId = " + oldZookeeper.getSessionId());
275 
276     Watcher watcher = new Watcher()
277     {
278       @Override
279       public void process(WatchedEvent event)
280       {
281         LOG.info("In New connection, process event:" + event);
282       }
283     };
284 
285     ZooKeeper newZookeeper =
286         new ZooKeeper(connection.getServers(),
287                       oldZookeeper.getSessionTimeout(),
288                       watcher,
289                       oldZookeeper.getSessionId(),
290                       oldZookeeper.getSessionPasswd());
291     LOG.info("New sessionId = " + newZookeeper.getSessionId());
292     // Thread.sleep(3000);
293     newZookeeper.close();
294     Thread.sleep(10000);
295     connection = (ZkConnection) zkClient.getConnection();
296     oldZookeeper = connection.getZookeeper();
297     LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
298   }
299 
300   protected void setupStateModel(String clusterName)
301   {
302     ZKHelixDataAccessor accessor =
303         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
304     Builder keyBuilder = accessor.keyBuilder();
305 
306     StateModelConfigGenerator generator = new StateModelConfigGenerator();
307     StateModelDefinition masterSlave =
308         new StateModelDefinition(generator.generateConfigForMasterSlave());
309     accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);
310 
311     StateModelDefinition leaderStandby =
312         new StateModelDefinition(generator.generateConfigForLeaderStandby());
313     accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);
314 
315     StateModelDefinition onlineOffline =
316         new StateModelDefinition(generator.generateConfigForOnlineOffline());
317     accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
318 
319   }
320 
321   protected List<IdealState> setupIdealState(String clusterName,
322                                              int[] nodes,
323                                              String[] resources,
324                                              int partitions,
325                                              int replicas)
326   {
327     ZKHelixDataAccessor accessor =
328         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
329     Builder keyBuilder = accessor.keyBuilder();
330 
331     List<IdealState> idealStates = new ArrayList<IdealState>();
332     List<String> instances = new ArrayList<String>();
333     for (int i : nodes)
334     {
335       instances.add("localhost_" + i);
336     }
337 
338     for (String resourceName : resources)
339     {
340       IdealState idealState = new IdealState(resourceName);
341       for (int p = 0; p < partitions; p++)
342       {
343         List<String> value = new ArrayList<String>();
344         for (int r = 0; r < replicas; r++)
345         {
346           int n = nodes[(p + r) % nodes.length];
347           value.add("localhost_" + n);
348         }
349         idealState.getRecord().setListField(resourceName + "_" + p, value);
350       }
351 
352       idealState.setReplicas(Integer.toString(replicas));
353       idealState.setStateModelDefRef("MasterSlave");
354       idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
355       idealState.setNumPartitions(partitions);
356       idealStates.add(idealState);
357 
358       // System.out.println(idealState);
359       accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
360     }
361     return idealStates;
362   }
363 
364   protected void setupLiveInstances(String clusterName, int[] liveInstances)
365   {
366     ZKHelixDataAccessor accessor =
367         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
368     Builder keyBuilder = accessor.keyBuilder();
369 
370     for (int i = 0; i < liveInstances.length; i++)
371     {
372       String instance = "localhost_" + liveInstances[i];
373       LiveInstance liveInstance = new LiveInstance(instance);
374       liveInstance.setSessionId("session_" + liveInstances[i]);
375       liveInstance.setHelixVersion("0.0.0");
376       accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
377     }
378   }
379 
380   protected void setupInstances(String clusterName, int[] instances)
381   {
382     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
383     for (int i = 0; i < instances.length; i++)
384     {
385       String instance = "localhost_" + instances[i];
386       InstanceConfig instanceConfig = new InstanceConfig(instance);
387       instanceConfig.setHostName("localhost");
388       instanceConfig.setPort("" + instances[i]);
389       instanceConfig.setInstanceEnabled(true);
390       admin.addInstance(clusterName, instanceConfig);
391     }
392   }
393 
394   protected void runPipeline(ClusterEvent event, Pipeline pipeline)
395   {
396     try
397     {
398       pipeline.handle(event);
399       pipeline.finish();
400     }
401     catch (Exception e)
402     {
403       LOG.error("Exception while executing pipeline:" + pipeline
404           + ". Will not continue to next pipeline", e);
405     }
406   }
407 
408   protected void runStage(ClusterEvent event, Stage stage) throws Exception
409   {
410     StageContext context = new StageContext();
411     stage.init(context);
412     stage.preProcess();
413     stage.process(event);
414     stage.postProcess();
415   }
416 
417   protected Message createMessage(MessageType type,
418                                   String msgId,
419                                   String fromState,
420                                   String toState,
421                                   String resourceName,
422                                   String tgtName)
423   {
424     Message msg = new Message(type.toString(), msgId);
425     msg.setFromState(fromState);
426     msg.setToState(toState);
427     msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
428     msg.setTgtName(tgtName);
429     return msg;
430   }
431 
432 }