View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  
28  import org.apache.helix.TestHelper;
29  import org.apache.helix.TestHelper.StartCMResult;
30  import org.apache.helix.controller.HelixControllerMain;
31  import org.apache.helix.manager.zk.ZNRecordSerializer;
32  import org.apache.helix.manager.zk.ZkClient;
33  import org.apache.helix.tools.ClusterSetup;
34  import org.apache.helix.tools.ClusterStateVerifier;
35  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36  import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
37  import org.apache.log4j.Logger;
38  import org.testng.Assert;
39  import org.testng.annotations.AfterClass;
40  import org.testng.annotations.BeforeClass;
41  
42  
43  /**
44   * 
45   * setup a storage cluster and start a zk-based cluster controller in stand-alone mode
46   * start 5 dummy participants verify the current states at end
47   */
48  
49  public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
50  {
51    private static Logger                LOG               =
52                                                               Logger.getLogger(ZkStandAloneCMTestBase.class);
53  
54    protected static final int           NODE_NR           = 5;
55    protected static final int           START_PORT        = 12918;
56    protected static final String        STATE_MODEL       = "MasterSlave";
57    protected static final String        TEST_DB           = "TestDB";
58    protected static final int           _PARTITIONS       = 20;
59  
60    protected ClusterSetup               _setupTool        = null;
61    protected final String               CLASS_NAME        = getShortClassName();
62    protected final String               CLUSTER_NAME      = CLUSTER_PREFIX + "_"
63                                                               + CLASS_NAME;
64  
65    protected Map<String, StartCMResult> _startCMResultMap =
66                                                               new HashMap<String, StartCMResult>();
67    protected ZkClient                   _zkClient;
68    
69    int _replica = 3;
70  
71    @BeforeClass
72    public void beforeClass() throws Exception
73    {
74  //    Logger.getRootLogger().setLevel(Level.INFO);
75      System.out.println("START " + CLASS_NAME + " at "
76          + new Date(System.currentTimeMillis()));
77  
78      _zkClient = new ZkClient(ZK_ADDR);
79      _zkClient.setZkSerializer(new ZNRecordSerializer());
80      String namespace = "/" + CLUSTER_NAME;
81      if (_zkClient.exists(namespace))
82      {
83        _zkClient.deleteRecursive(namespace);
84      }
85      _setupTool = new ClusterSetup(ZK_ADDR);
86  
87      // setup storage cluster
88      _setupTool.addCluster(CLUSTER_NAME, true);
89      _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
90      for (int i = 0; i < NODE_NR; i++)
91      {
92        String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
93        _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
94      }
95      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
96  
97      // start dummy participants
98      for (int i = 0; i < NODE_NR; i++)
99      {
100       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
101       if (_startCMResultMap.get(instanceName) != null)
102       {
103         LOG.error("fail to start particpant:" + instanceName
104             + "(participant with same name already exists)");
105       }
106       else
107       {
108         StartCMResult result =
109             TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
110         _startCMResultMap.put(instanceName, result);
111       }
112     }
113 
114     // start controller
115     String controllerName = CONTROLLER_PREFIX + "_0";
116     StartCMResult startResult =
117         TestHelper.startController(CLUSTER_NAME,
118                                    controllerName,
119                                    ZK_ADDR,
120                                    HelixControllerMain.STANDALONE);
121     _startCMResultMap.put(controllerName, startResult);
122     
123     boolean result =
124         ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
125                                                                               CLUSTER_NAME));
126 
127     result =
128         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
129                                                                                  CLUSTER_NAME));
130     Assert.assertTrue(result);
131   }
132 
133   @AfterClass
134   public void afterClass() throws Exception
135   {
136     /**
137      * shutdown order: 1) disconnect the controller 2) disconnect participants
138      */
139    
140     StartCMResult result;
141     Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
142     while (it.hasNext())
143     {
144       String instanceName = it.next().getKey();
145       if (instanceName.startsWith(CONTROLLER_PREFIX))
146       {
147         result = _startCMResultMap.get(instanceName);
148         result._manager.disconnect();
149         result._thread.interrupt();
150         it.remove();
151       }
152     }
153 
154     Thread.sleep(100);
155     it = _startCMResultMap.entrySet().iterator();
156     while (it.hasNext())
157     {
158       String instanceName = it.next().getKey();
159       result = _startCMResultMap.get(instanceName);
160       result._manager.disconnect();
161       result._thread.interrupt();
162       it.remove();
163     }
164 
165     _zkClient.close();
166     // logger.info("END at " + new Date(System.currentTimeMillis()));
167     System.out.println("END " + CLASS_NAME + " at "
168         + new Date(System.currentTimeMillis()));
169   }
170 }