View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Map.Entry;
28  import java.util.Random;
29  import java.util.concurrent.ConcurrentHashMap;
30  
31  import org.apache.helix.HelixManager;
32  import org.apache.helix.PropertyPathConfig;
33  import org.apache.helix.PropertyType;
34  import org.apache.helix.TestHelper;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.TestHelper.StartCMResult;
37  import org.apache.helix.controller.HelixControllerMain;
38  import org.apache.helix.manager.zk.ZNRecordSerializer;
39  import org.apache.helix.manager.zk.ZkClient;
40  import org.apache.helix.model.IdealState.IdealStateModeProperty;
41  import org.apache.helix.model.IdealState.IdealStateProperty;
42  import org.apache.helix.store.PropertyJsonSerializer;
43  import org.apache.helix.store.PropertyStoreException;
44  import org.apache.helix.tools.ClusterSetup;
45  import org.apache.helix.tools.ClusterStateVerifier;
46  import org.apache.helix.tools.DefaultIdealStateCalculator;
47  import org.apache.helix.tools.TestCommand;
48  import org.apache.helix.tools.TestExecutor;
49  import org.apache.helix.tools.TestTrigger;
50  import org.apache.helix.tools.ZnodeOpArg;
51  import org.apache.helix.tools.TestCommand.CommandType;
52  import org.apache.helix.tools.TestCommand.NodeOpArg;
53  import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
54  import org.apache.log4j.Logger;
55  import org.testng.Assert;
56  
57  
58  public class TestDriver
59  {
60    private static Logger LOG = Logger.getLogger(TestDriver.class);
61    private static final String ZK_ADDR = ZkIntegrationTestBase.ZK_ADDR;
62  
63    // private static final String CLUSTER_PREFIX = "TestDriver";
64    private static final String STATE_MODEL = "MasterSlave";
65    private static final String TEST_DB_PREFIX = "TestDB";
66    private static final int START_PORT = 12918;
67    private static final String CONTROLLER_PREFIX = "controller";
68    private static final String PARTICIPANT_PREFIX = "localhost";
69    private static final Random RANDOM = new Random();
70    private static final PropertyJsonSerializer<ZNRecord> SERIALIZER = new PropertyJsonSerializer<ZNRecord>(
71        ZNRecord.class);
72  
73    private static final Map<String, TestInfo> _testInfoMap = new ConcurrentHashMap<String, TestInfo>();
74  
75    public static class TestInfo
76    {
77      public final ZkClient _zkClient;
78      public final String _clusterName;
79      public final int _numDb;
80      public final int _numPartitionsPerDb;
81      public final int _numNode;
82      public final int _replica;
83  
84      // public final Map<String, ZNRecord> _idealStateMap = new
85      // ConcurrentHashMap<String, ZNRecord>();
86      public final Map<String, StartCMResult> _startCMResultMap = new ConcurrentHashMap<String, StartCMResult>();
87  
88      public TestInfo(String clusterName, ZkClient zkClient, int numDb, int numPartitionsPerDb,
89          int numNode, int replica)
90      {
91        this._clusterName = clusterName;
92        this._zkClient = zkClient;
93        this._numDb = numDb;
94        this._numPartitionsPerDb = numPartitionsPerDb;
95        this._numNode = numNode;
96        this._replica = replica;
97      }
98    }
99  
100   public static TestInfo getTestInfo(String uniqClusterName)
101   {
102     if (!_testInfoMap.containsKey(uniqClusterName))
103     {
104       String errMsg = "Cluster hasn't been setup for " + uniqClusterName;
105       throw new IllegalArgumentException(errMsg);
106     }
107 
108     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
109     return testInfo;
110   }
111 
112   public static void setupClusterWithoutRebalance(String uniqClusterName, String zkAddr,
113       int numResources, int numPartitionsPerResource, int numInstances, int replica)
114       throws Exception
115   {
116     setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
117         replica, false);
118   }
119 
120   public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
121       int numPartitionsPerResource, int numInstances, int replica) throws Exception
122   {
123     setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
124         replica, true);
125   }
126 
127   // public static void setupCluster(String uniqTestName, ZkClient zkClient, int
128   // numDb,
129   // int numPartitionPerDb, int numNodes, int replica, boolean doRebalance)
130   // throws Exception
131   public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
132       int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
133       throws Exception
134   {
135     ZkClient zkClient = new ZkClient(zkAddr);
136     zkClient.setZkSerializer(new ZNRecordSerializer());
137 
138     // String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
139     String clusterName = uniqClusterName;
140     if (zkClient.exists("/" + clusterName))
141     {
142       LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName
143           + " is not unique or test has been run without cleaning up zk; deleting it");
144       zkClient.deleteRecursive("/" + clusterName);
145     }
146 
147     if (_testInfoMap.containsKey(uniqClusterName))
148     {
149       LOG.warn("test info already exists:" + uniqClusterName
150           + " is not unique or test has been run without cleaning up test info map; removing it");
151       _testInfoMap.remove(uniqClusterName);
152     }
153     TestInfo testInfo = new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource,
154         numInstances, replica);
155     _testInfoMap.put(uniqClusterName, testInfo);
156 
157     ClusterSetup setupTool = new ClusterSetup(zkAddr);
158     setupTool.addCluster(clusterName, true);
159 
160     for (int i = 0; i < numInstances; i++)
161     {
162       int port = START_PORT + i;
163       setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + "_" + port);
164     }
165 
166     for (int i = 0; i < numResources; i++)
167     {
168       String dbName = TEST_DB_PREFIX + i;
169       setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource,
170           STATE_MODEL);
171       if (doRebalance)
172       {
173         setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
174 
175         // String idealStatePath = "/" + clusterName + "/" +
176         // PropertyType.IDEALSTATES.toString() + "/"
177         // + dbName;
178         // ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
179         // testInfo._idealStateMap.put(dbName, idealState);
180       }
181     }
182   }
183 
184   /**
185    * starting a dummy participant with a given id
186    *
187    * @param uniqueTestName
188    * @param instanceId
189    */
190   public static void startDummyParticipant(String uniqClusterName, int instanceId) throws Exception
191   {
192     startDummyParticipants(uniqClusterName, new int[] { instanceId });
193   }
194 
195   public static void startDummyParticipants(String uniqClusterName, int[] instanceIds)
196       throws Exception
197   {
198     if (!_testInfoMap.containsKey(uniqClusterName))
199     {
200       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
201       throw new IllegalArgumentException(errMsg);
202     }
203 
204     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
205     String clusterName = testInfo._clusterName;
206 
207     for (int id : instanceIds)
208     {
209       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
210 
211       if (testInfo._startCMResultMap.containsKey(instanceName))
212       {
213         LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
214       } else
215       {
216         StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
217         testInfo._startCMResultMap.put(instanceName, result);
218         // testInfo._instanceStarted.countDown();
219       }
220     }
221   }
222 
223   public static void startController(String uniqClusterName) throws Exception
224   {
225     startController(uniqClusterName, new int[] { 0 });
226   }
227 
228   public static void startController(String uniqClusterName, int[] nodeIds) throws Exception
229   {
230     if (!_testInfoMap.containsKey(uniqClusterName))
231     {
232       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
233       throw new IllegalArgumentException(errMsg);
234     }
235 
236     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
237     String clusterName = testInfo._clusterName;
238 
239     for (int id : nodeIds)
240     {
241       String controllerName = CONTROLLER_PREFIX + "_" + id;
242       if (testInfo._startCMResultMap.containsKey(controllerName))
243       {
244         LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
245       } else
246       {
247         StartCMResult result = TestHelper.startController(clusterName, controllerName, ZK_ADDR,
248             HelixControllerMain.STANDALONE);
249         testInfo._startCMResultMap.put(controllerName, result);
250       }
251     }
252   }
253 
254   public static void verifyCluster(String uniqClusterName, long beginTime, long timeout)
255       throws Exception
256   {
257     Thread.sleep(beginTime);
258 
259     if (!_testInfoMap.containsKey(uniqClusterName))
260     {
261       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
262       throw new IllegalArgumentException(errMsg);
263     }
264 
265     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
266     String clusterName = testInfo._clusterName;
267 
268     boolean result = ClusterStateVerifier.verifyByPolling(
269         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName), timeout);
270     Assert.assertTrue(result);
271   }
272 
273   public static void stopCluster(String uniqClusterName) throws Exception
274   {
275     if (!_testInfoMap.containsKey(uniqClusterName))
276     {
277       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
278       throw new IllegalArgumentException(errMsg);
279     }
280     TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
281 
282     // stop controller first
283     for (Iterator<Entry<String, StartCMResult>> it = testInfo._startCMResultMap.entrySet()
284         .iterator(); it.hasNext();)
285     {
286       Map.Entry<String, StartCMResult> entry = it.next();
287       String instanceName = entry.getKey();
288       if (instanceName.startsWith(CONTROLLER_PREFIX))
289       {
290         it.remove();
291         HelixManager manager = entry.getValue()._manager;
292         manager.disconnect();
293         Thread thread = entry.getValue()._thread;
294         thread.interrupt();
295       }
296     }
297 
298     Thread.sleep(1000);
299 
300     // stop the rest
301     for (Map.Entry<String, StartCMResult> entry : testInfo._startCMResultMap.entrySet())
302     {
303       HelixManager manager = entry.getValue()._manager;
304       manager.disconnect();
305       Thread thread = entry.getValue()._thread;
306       thread.interrupt();
307     }
308 
309     testInfo._zkClient.close();
310   }
311 
312   public static void stopDummyParticipant(String uniqClusterName, long beginTime, int instanceId)
313       throws Exception
314   {
315     if (!_testInfoMap.containsKey(uniqClusterName))
316     {
317 
318       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
319       throw new Exception(errMsg);
320     }
321 
322     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
323     // String clusterName = testInfo._clusterName;
324 
325     String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
326     StartCMResult result = testInfo._startCMResultMap.remove(failHost);
327 
328     // TODO need sync
329     if (result == null || result._manager == null || result._thread == null)
330     {
331       String errMsg = "Dummy participant:" + failHost + " seems not running";
332       LOG.error(errMsg);
333     } else
334     {
335       // System.err.println("try to stop participant: " +
336       // result._manager.getInstanceName());
337       NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
338       TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
339       List<TestCommand> commandList = new ArrayList<TestCommand>();
340       commandList.add(command);
341       TestExecutor.executeTestAsync(commandList, ZK_ADDR);
342     }
343   }
344 
345   public static void setIdealState(String uniqClusterName, long beginTime, int percentage)
346       throws Exception
347   {
348     if (!_testInfoMap.containsKey(uniqClusterName))
349     {
350       String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
351       throw new IllegalArgumentException(errMsg);
352     }
353     TestInfo testInfo = _testInfoMap.get(uniqClusterName);
354     String clusterName = testInfo._clusterName;
355     List<String> instanceNames = new ArrayList<String>();
356 
357     for (int i = 0; i < testInfo._numNode; i++)
358     {
359       int port = START_PORT + i;
360       instanceNames.add(PARTICIPANT_PREFIX + "_" + port);
361     }
362 
363     List<TestCommand> commandList = new ArrayList<TestCommand>();
364     for (int i = 0; i < testInfo._numDb; i++)
365     {
366       String dbName = TEST_DB_PREFIX + i;
367       ZNRecord destIS = DefaultIdealStateCalculator.calculateIdealState(instanceNames,
368           testInfo._numPartitionsPerDb, testInfo._replica - 1, dbName, "MASTER", "SLAVE");
369       // destIS.setId(dbName);
370       destIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
371           IdealStateModeProperty.CUSTOMIZED.toString());
372       destIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
373           Integer.toString(testInfo._numPartitionsPerDb));
374       destIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
375       destIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
376       // String idealStatePath = "/" + clusterName + "/" +
377       // PropertyType.IDEALSTATES.toString() + "/"
378       // + TEST_DB_PREFIX + i;
379       ZNRecord initIS = new ZNRecord(dbName); // _zkClient.<ZNRecord>
380                                               // readData(idealStatePath);
381       initIS.setSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString(),
382           IdealStateModeProperty.CUSTOMIZED.toString());
383       initIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
384           Integer.toString(testInfo._numPartitionsPerDb));
385       initIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
386       initIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
387       int totalStep = calcuateNumTransitions(initIS, destIS);
388       // LOG.info("initIS:" + initIS);
389       // LOG.info("destIS:" + destIS);
390       // LOG.info("totalSteps from initIS to destIS:" + totalStep);
391       // System.out.println("initIS:" + initIS);
392       // System.out.println("destIS:" + destIS);
393 
394       ZNRecord nextIS;
395       int step = totalStep * percentage / 100;
396       System.out.println("Resource:" + dbName + ", totalSteps from initIS to destIS:" + totalStep
397           + ", walk " + step + " steps(" + percentage + "%)");
398       nextIS = nextIdealState(initIS, destIS, step);
399       // testInfo._idealStateMap.put(dbName, nextIS);
400       String idealStatePath = PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName,
401           TEST_DB_PREFIX + i);
402       ZnodeOpArg arg = new ZnodeOpArg(idealStatePath, ZnodePropertyType.ZNODE, "+", nextIS);
403       TestCommand command = new TestCommand(CommandType.MODIFY, new TestTrigger(beginTime), arg);
404       commandList.add(command);
405     }
406 
407     TestExecutor.executeTestAsync(commandList, ZK_ADDR);
408 
409   }
410 
411   private static List<String[]> findAllUnfinishPairs(ZNRecord cur, ZNRecord dest)
412   {
413     // find all (host, resource) pairs that haven't reached destination state
414     List<String[]> list = new ArrayList<String[]>();
415     Map<String, Map<String, String>> map = dest.getMapFields();
416     for (Map.Entry<String, Map<String, String>> entry : map.entrySet())
417     {
418       String partitionName = entry.getKey();
419       Map<String, String> hostMap = entry.getValue();
420       for (Map.Entry<String, String> hostEntry : hostMap.entrySet())
421       {
422         String host = hostEntry.getKey();
423         String destState = hostEntry.getValue();
424         Map<String, String> curHostMap = cur.getMapField(partitionName);
425 
426         String curState = null;
427         if (curHostMap != null)
428         {
429           curState = curHostMap.get(host);
430         }
431 
432         String[] pair = new String[3];
433         if (curState == null)
434         {
435           if (destState.equalsIgnoreCase("SLAVE"))
436           {
437             pair[0] = new String(partitionName);
438             pair[1] = new String(host);
439             pair[2] = new String("1"); // number of transitions required
440             list.add(pair);
441           } else if (destState.equalsIgnoreCase("MASTER"))
442           {
443             pair[0] = new String(partitionName);
444             pair[1] = new String(host);
445             pair[2] = new String("2"); // number of transitions required
446             list.add(pair);
447           }
448         } else
449         {
450           if (curState.equalsIgnoreCase("SLAVE") && destState.equalsIgnoreCase("MASTER"))
451           {
452             pair[0] = new String(partitionName);
453             pair[1] = new String(host);
454             pair[2] = new String("1"); // number of transitions required
455             list.add(pair);
456           }
457         }
458       }
459     }
460     return list;
461   }
462 
463   private static int calcuateNumTransitions(ZNRecord start, ZNRecord end)
464   {
465     int totalSteps = 0;
466     List<String[]> list = findAllUnfinishPairs(start, end);
467     for (String[] pair : list)
468     {
469       totalSteps += Integer.parseInt(pair[2]);
470     }
471     return totalSteps;
472   }
473 
474   private static ZNRecord nextIdealState(final ZNRecord cur, final ZNRecord dest, final int steps)
475       throws PropertyStoreException
476   {
477     // get a deep copy
478     ZNRecord next = SERIALIZER.deserialize(SERIALIZER.serialize(cur));
479     List<String[]> list = findAllUnfinishPairs(cur, dest);
480 
481     // randomly pick up pairs that haven't reached destination state and
482     // progress
483     for (int i = 0; i < steps; i++)
484     {
485       int randomInt = RANDOM.nextInt(list.size());
486       String[] pair = list.get(randomInt);
487       String curState = null;
488       Map<String, String> curHostMap = next.getMapField(pair[0]);
489       if (curHostMap != null)
490       {
491         curState = curHostMap.get(pair[1]);
492       }
493       final String destState = dest.getMapField(pair[0]).get(pair[1]);
494 
495       // TODO generalize it using state-model
496       if (curState == null && destState != null)
497       {
498         Map<String, String> hostMap = next.getMapField(pair[0]);
499         if (hostMap == null)
500         {
501           hostMap = new HashMap<String, String>();
502         }
503         hostMap.put(pair[1], "SLAVE");
504         next.setMapField(pair[0], hostMap);
505       } else if (curState.equalsIgnoreCase("SLAVE") && destState != null
506           && destState.equalsIgnoreCase("MASTER"))
507       {
508         next.getMapField(pair[0]).put(pair[1], "MASTER");
509       } else
510       {
511         LOG.error("fail to calculate the next ideal state");
512       }
513       curState = next.getMapField(pair[0]).get(pair[1]);
514       if (curState != null && curState.equalsIgnoreCase(destState))
515       {
516         list.remove(randomInt);
517       }
518     }
519 
520     LOG.info("nextIS:" + next);
521     return next;
522   }
523 }