View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.Arrays;
24  import java.util.Date;
25  
26  import org.apache.helix.HelixException;
27  import org.apache.helix.PropertyKey;
28  import org.apache.helix.PropertyPathConfig;
29  import org.apache.helix.PropertyType;
30  import org.apache.helix.TestHelper;
31  import org.apache.helix.ZNRecord;
32  import org.apache.helix.ZkUnitTestBase;
33  import org.apache.helix.PropertyKey.Builder;
34  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
35  import org.apache.helix.manager.zk.ZNRecordSerializer;
36  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
37  import org.apache.helix.manager.zk.ZkClient;
38  import org.apache.helix.model.LiveInstance;
39  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
40  import org.apache.helix.tools.ClusterSetup;
41  import org.apache.helix.util.HelixUtil;
42  import org.apache.log4j.Logger;
43  import org.testng.Assert;
44  import org.testng.AssertJUnit;
45  import org.testng.annotations.AfterClass;
46  import org.testng.annotations.BeforeClass;
47  import org.testng.annotations.BeforeMethod;
48  import org.testng.annotations.Test;
49  
50  
51  public class TestClusterSetup extends ZkUnitTestBase
52  {
53    private static Logger         LOG             =
54                                                      Logger.getLogger(TestClusterSetup.class);
55  
56    protected static final String CLUSTER_NAME    = "TestClusterSetup";
57    protected static final String TEST_DB         = "TestDB";
58    protected static final String INSTANCE_PREFIX = "instance_";
59    protected static final String STATE_MODEL     = "MasterSlave";
60    protected static final String TEST_NODE       = "testnode_1";
61  
62    ZkClient                      _zkClient;
63    ClusterSetup                  _clusterSetup;
64  
65    private static String[] createArgs(String str)
66    {
67      String[] split = str.split("[ ]+");
68      System.out.println(Arrays.toString(split));
69      return split;
70    }
71  
72    @BeforeClass()
73    public void beforeClass() throws IOException,
74        Exception
75    {
76      System.out.println("START TestClusterSetup.beforeClass() "
77          + new Date(System.currentTimeMillis()));
78  
79      _zkClient = new ZkClient(ZK_ADDR);
80      _zkClient.setZkSerializer(new ZNRecordSerializer());
81    }
82  
83    @AfterClass()
84    public void afterClass()
85    {
86      _zkClient.close();
87      System.out.println("END TestClusterSetup.afterClass() "
88          + new Date(System.currentTimeMillis()));
89    }
90  
91    @BeforeMethod()
92    public void setup()
93    {
94  
95      _zkClient.deleteRecursive("/" + CLUSTER_NAME);
96      _clusterSetup = new ClusterSetup(ZK_ADDR);
97      _clusterSetup.addCluster(CLUSTER_NAME, true);
98    }
99  
100   @Test()
101   public void testAddInstancesToCluster() throws Exception
102   {
103     String instanceAddresses[] = new String[3];
104     for (int i = 0; i < 3; i++)
105     {
106       String currInstance = INSTANCE_PREFIX + i;
107       instanceAddresses[i] = currInstance;
108     }
109     String nextInstanceAddress = INSTANCE_PREFIX + 3;
110 
111     _clusterSetup.addInstancesToCluster(CLUSTER_NAME, instanceAddresses);
112 
113     // verify instances
114     for (String instance : instanceAddresses)
115     {
116       verifyInstance(_zkClient,
117                      CLUSTER_NAME,
118                      instance,
119                      true);
120     }
121 
122     _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
123     verifyInstance(_zkClient,
124                    CLUSTER_NAME,
125                    nextInstanceAddress,
126                    true);
127     // re-add
128     boolean caughtException = false;
129     try
130     {
131       _clusterSetup.addInstanceToCluster(CLUSTER_NAME, nextInstanceAddress);
132     }
133     catch (HelixException e)
134     {
135       caughtException = true;
136     }
137     AssertJUnit.assertTrue(caughtException);
138   }
139 
140   @Test()
141   public void testDisableDropInstancesFromCluster() throws Exception
142   {
143     testAddInstancesToCluster();
144     String instanceAddresses[] = new String[3];
145     for (int i = 0; i < 3; i++)
146     {
147       String currInstance = INSTANCE_PREFIX + i;
148       instanceAddresses[i] = currInstance;
149     }
150     String nextInstanceAddress = INSTANCE_PREFIX + 3;
151 
152     boolean caughtException = false;
153     // drop without disabling
154     try
155     {
156       _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
157     }
158     catch (HelixException e)
159     {
160       caughtException = true;
161     }
162     AssertJUnit.assertTrue(caughtException);
163 
164     // disable
165     _clusterSetup.getClusterManagementTool()
166                  .enableInstance(CLUSTER_NAME,
167                                  nextInstanceAddress,
168                                  false);
169     verifyEnabled(_zkClient,
170                   CLUSTER_NAME,
171                   nextInstanceAddress,
172                   false);
173 
174     // drop
175     _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
176     verifyInstance(_zkClient,
177                    CLUSTER_NAME,
178                    nextInstanceAddress,
179                    false);
180 
181     // re-drop
182     caughtException = false;
183     try
184     {
185       _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, nextInstanceAddress);
186     }
187     catch (HelixException e)
188     {
189       caughtException = true;
190     }
191     AssertJUnit.assertTrue(caughtException);
192     /*
193      * //drop a set _clusterSetup.getClusterManagementTool().enableInstances(CLUSTER_NAME,
194      * instanceAddresses, false); _clusterSetup.dropInstancesFromCluster(CLUSTER_NAME,
195      * instanceAddresses);
196      */
197 
198     // bad format disable, drop
199     String badFormatInstance = "badinstance";
200     caughtException = false;
201     try
202     {
203       _clusterSetup.getClusterManagementTool().enableInstance(CLUSTER_NAME,
204                                                               badFormatInstance,
205                                                               false);
206     }
207     catch (HelixException e)
208     {
209       caughtException = true;
210     }
211     AssertJUnit.assertTrue(caughtException);
212 
213     caughtException = false;
214     try
215     {
216       _clusterSetup.dropInstanceFromCluster(CLUSTER_NAME, badFormatInstance);
217     }
218     catch (HelixException e)
219     {
220       caughtException = true;
221     }
222     AssertJUnit.assertTrue(caughtException);
223   }
224 
225   @Test()
226   public void testAddResource() throws Exception
227   {
228     try
229     {
230       _clusterSetup.addResourceToCluster(CLUSTER_NAME, TEST_DB, 16, STATE_MODEL);
231     }
232     catch(Exception e)
233     {}
234     verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
235   }
236 
237   @Test()
238   public void testRemoveResource() throws Exception
239   {
240     _clusterSetup.setupTestCluster(CLUSTER_NAME);
241     verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
242     _clusterSetup.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
243     verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, false);
244   }
245 
246   @Test()
247   public void testRebalanceCluster() throws Exception
248   {
249     _clusterSetup.setupTestCluster(CLUSTER_NAME);
250     // testAddInstancesToCluster();
251     testAddResource();
252     _clusterSetup.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 4);
253     verifyReplication(_zkClient, CLUSTER_NAME, TEST_DB, 4);
254   }
255 
256   /*
257    * @Test (groups = {"unitTest"}) public void testPrintUsage() throws Exception { Options
258    * cliOptions = ClusterSetup.constructCommandLineOptions();
259    * ClusterSetup.printUsage(null); }
260    */
261 
262   @Test()
263   public void testParseCommandLinesArgs() throws Exception
264   {
265     // ClusterSetup
266     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+ " help"));
267 
268     // wipe ZK
269     _zkClient.deleteRecursive("/" + CLUSTER_NAME);
270     _clusterSetup = new ClusterSetup(ZK_ADDR);
271 
272     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addCluster "
273         + CLUSTER_NAME));
274 
275     // wipe again
276     _zkClient.deleteRecursive("/" + CLUSTER_NAME);
277     _clusterSetup = new ClusterSetup(ZK_ADDR);
278 
279     _clusterSetup.setupTestCluster(CLUSTER_NAME);
280 
281     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --addNode "
282         + CLUSTER_NAME + " " + TEST_NODE));
283     verifyInstance(_zkClient,
284                    CLUSTER_NAME,
285                    TEST_NODE,
286                    true);
287     try
288     {
289       ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
290         + " --addResource " + CLUSTER_NAME + " " + TEST_DB + " 4 " + STATE_MODEL));
291     }
292     catch(Exception e)
293     {
294       
295     }
296     verifyResource(_zkClient, CLUSTER_NAME, TEST_DB, true);
297     // ClusterSetup
298     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --addNode node-1"));
299     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
300         + " --enableInstance " + CLUSTER_NAME + " "
301         + TEST_NODE + " true"));
302     verifyEnabled(_zkClient,
303                   CLUSTER_NAME,
304                   TEST_NODE,
305                   true);
306 
307     // TODO: verify list commands
308     /*
309      * ClusterSetup
310      * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusterInfo "
311      * +CLUSTER_NAME)); ClusterSetup
312      * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listClusters"));
313      * ClusterSetup
314      * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listInstanceInfo "
315      * +CLUSTER_NAME+" "+instanceColonToUnderscoreFormat(TEST_NODE))); ClusterSetup
316      * .processCommandLineArgs
317      * (createArgs("-zkSvr "+ZK_ADDR+" --listInstances "+CLUSTER_NAME)); ClusterSetup
318      * .processCommandLineArgs
319      * (createArgs("-zkSvr "+ZK_ADDR+" --listResourceInfo "+CLUSTER_NAME +" "+TEST_DB));
320      * ClusterSetup
321      * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listResources "
322      * +CLUSTER_NAME)); ClusterSetup
323      * .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --listStateModel "
324      * +CLUSTER_NAME+" "+STATE_MODEL)); ClusterSetup
325      * .processCommandLineArgs(createArgs("-zkSvr "
326      * +ZK_ADDR+" --listStateModels "+CLUSTER_NAME));
327      */
328     // ClusterSetup
329     // .processCommandLineArgs(createArgs("-zkSvr "+ZK_ADDR+" --rebalance "+CLUSTER_NAME+" "+TEST_DB+" 1"));
330     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR
331         + " --enableInstance " + CLUSTER_NAME + " "
332         + TEST_NODE + " false"));
333     verifyEnabled(_zkClient,
334                   CLUSTER_NAME,
335                   TEST_NODE,
336                   false);
337     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " --dropNode "
338         + CLUSTER_NAME + " " + TEST_NODE));
339   }
340 
341   @Test()
342   public void testSetGetConfig() throws Exception
343   {
344     String className = TestHelper.getTestClassName();
345     String methodName = TestHelper.getTestMethodName();
346     String clusterName = className + "_" + methodName;
347 
348     System.out.println("START " + clusterName + " at "
349         + new Date(System.currentTimeMillis()));
350 
351     // basic
352     _clusterSetup.addCluster(clusterName, true);
353     _clusterSetup.addInstanceToCluster(clusterName, "localhost_0");
354     String scopeArgs = clusterName + ",localhost_0";
355     String keyValueMap = "key1=value1,key2=value2";
356     String keys = "key1,key2";
357     _clusterSetup.setConfig(ConfigScopeProperty.PARTICIPANT, scopeArgs, keyValueMap);
358     String valuesStr = _clusterSetup.getConfig(ConfigScopeProperty.PARTICIPANT, scopeArgs, keys);
359     
360     // getConfig returns json-formatted key-value pairs
361     ZNRecord record = new ZNRecord(ConfigScopeProperty.PARTICIPANT.toString());
362     // record.setMapField(scopesStr,HelixUtil.parseCsvFormatedKeyValuePairs(propertiesStr));
363     record.getSimpleFields().putAll(HelixUtil.parseCsvFormatedKeyValuePairs(keyValueMap));
364     ZNRecordSerializer serializer = new ZNRecordSerializer();
365     Assert.assertEquals(valuesStr, new String(serializer.serialize(record)));
366 
367     System.out.println("END " + clusterName + " at "
368         + new Date(System.currentTimeMillis()));
369   }
370 
371   @Test
372   public void testEnableCluster() throws Exception
373   {
374     // Logger.getRootLogger().setLevel(Level.INFO);
375     String className = TestHelper.getTestClassName();
376     String methodName = TestHelper.getTestMethodName();
377     String clusterName = className + "_" + methodName;
378 
379     System.out.println("START " + clusterName + " at "
380         + new Date(System.currentTimeMillis()));
381 
382     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
383                             "localhost", // participant name prefix
384                             "TestDB", // resource name prefix
385                             1, // resources
386                             10, // partitions per resource
387                             5, // number of nodes
388                             3, // replicas
389                             "MasterSlave",
390                             true); // do rebalance
391 
392     // pause cluster
393     ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
394         "--enableCluster", clusterName, "false" });
395 
396     Builder keyBuilder = new Builder(clusterName);
397     boolean exists = _gZkClient.exists(keyBuilder.pause().getPath());
398     Assert.assertTrue(exists, "pause node under controller should be created");
399 
400     // resume cluster
401     ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
402         "--enableCluster", clusterName, "true" });
403 
404     exists = _gZkClient.exists(keyBuilder.pause().getPath());
405     Assert.assertFalse(exists, "pause node under controller should be removed");
406 
407     System.out.println("END " + clusterName + " at "
408         + new Date(System.currentTimeMillis()));
409 
410   }
411 
412   @Test
413   public void testDropInstance() throws Exception
414   {
415     // drop without stop, should throw exception
416     String className = TestHelper.getTestClassName();
417     String methodName = TestHelper.getTestMethodName();
418     String clusterName = className + "_" + methodName;
419 
420     System.out.println("START " + clusterName + " at "
421         + new Date(System.currentTimeMillis()));
422 
423     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
424                             "localhost", // participant name prefix
425                             "TestDB", // resource name prefix
426                             1, // resources
427                             10, // partitions per resource
428                             5, // number of nodes
429                             3, // replicas
430                             "MasterSlave",
431                             true); // do rebalance
432 
433     // add fake liveInstance
434     ZKHelixDataAccessor accessor =
435         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
436     Builder keyBuilder = new Builder(clusterName);
437     LiveInstance liveInstance = new LiveInstance("localhost_12918");
438     liveInstance.setSessionId("session_0");
439     liveInstance.setHelixVersion("version_0");
440     accessor.setProperty(keyBuilder.liveInstance("localhost_12918"), liveInstance);
441 
442     // drop without stop the process, should throw exception
443     try
444     {
445       ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
446           "--dropNode", clusterName, "localhost:12918" });
447       Assert.fail("Should throw exception since localhost_12918 is still in LIVEINSTANCES/");
448     }
449     catch (Exception e)
450     {
451       // OK
452     }
453     accessor.removeProperty(keyBuilder.liveInstance("localhost_12918"));
454 
455     // drop without disable, should throw exception
456     try
457     {
458       ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
459           "--dropNode", clusterName, "localhost:12918" });
460       Assert.fail("Should throw exception since localhost_12918 is enabled");
461     }
462     catch (Exception e)
463     {
464       // e.printStackTrace();
465       // OK
466     }
467 
468     // drop it
469     ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR,
470         "--enableInstance", clusterName, "localhost_12918", "false" });
471     ClusterSetup.processCommandLineArgs(new String[] { "--zkSvr", ZK_ADDR, "--dropNode",
472         clusterName, "localhost:12918" });
473 
474     Assert.assertNull(accessor.getProperty(keyBuilder.instanceConfig("localhost_12918")),
475                       "Instance config should be dropped");
476     Assert.assertFalse(_gZkClient.exists(PropertyPathConfig.getPath(PropertyType.INSTANCES,
477                                                                     clusterName,
478                                                                     "localhost_12918")),
479                        "Instance/host should be dropped");
480     
481     System.out.println("END " + clusterName + " at "
482         + new Date(System.currentTimeMillis()));
483 
484   }
485   
486 }