View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  
28  import org.apache.helix.PropertyType;
29  import org.apache.helix.TestHelper;
30  import org.apache.helix.TestHelper.StartCMResult;
31  import org.apache.helix.controller.HelixControllerMain;
32  import org.apache.helix.model.PauseSignal;
33  import org.apache.helix.tools.ClusterSetup;
34  import org.apache.helix.tools.ClusterStateVerifier;
35  import org.apache.log4j.Logger;
36  import org.testng.Assert;
37  import org.testng.annotations.AfterClass;
38  import org.testng.annotations.BeforeClass;
39  import org.testng.annotations.Test;
40  
41  
42  public class TestAddClusterV2 extends ZkIntegrationTestBase
43  {
44    private static Logger LOG = Logger.getLogger(TestAddClusterV2.class);
45  
46    protected static final int CLUSTER_NR = 10;
47    protected static final int NODE_NR = 5;
48    protected static final int START_PORT = 12918;
49    protected static final String STATE_MODEL = "MasterSlave";
50    protected ClusterSetup _setupTool = null;
51    protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
52  
53    protected final String CLASS_NAME = getShortClassName();
54    protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
55  
56    protected static final String TEST_DB = "TestDB";
57  
58    @BeforeClass
59    public void beforeClass() throws Exception
60    {
61      System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
62  
63      String namespace = "/" + CONTROLLER_CLUSTER;
64      if (_gZkClient.exists(namespace))
65      {
66        _gZkClient.deleteRecursive(namespace);
67      }
68  
69      for (int i = 0; i < CLUSTER_NR; i++)
70      {
71        namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
72        if (_gZkClient.exists(namespace))
73        {
74          _gZkClient.deleteRecursive(namespace);
75        }
76      }
77  
78      _setupTool = new ClusterSetup(ZK_ADDR);
79  
80  
81      // setup CONTROLLER_CLUSTER
82      _setupTool.addCluster(CONTROLLER_CLUSTER, true);
83      for (int i = 0; i < NODE_NR; i++)
84      {
85        String controllerName = CONTROLLER_PREFIX + "_" + i;
86        _setupTool.addInstanceToCluster(CONTROLLER_CLUSTER, controllerName);
87      }
88  
89  
90      // setup cluster of clusters
91      for (int i = 0; i < CLUSTER_NR; i++)
92      {
93        String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
94        _setupTool.addCluster(clusterName, true);
95        _setupTool.activateCluster(clusterName,  CONTROLLER_CLUSTER, true);
96      }
97  
98      final String firstCluster = CLUSTER_PREFIX + "_" + CLASS_NAME + "_0";
99      setupStorageCluster(_setupTool, firstCluster, TEST_DB, 20, PARTICIPANT_PREFIX,
100                         START_PORT, "MasterSlave", 3, true);
101 
102     // start dummy participants for the first cluster
103     for (int i = 0; i < 5; i++)
104     {
105       String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
106       if (_startCMResultMap.get(instanceName) != null)
107       {
108         LOG.error("fail to start participant:" + instanceName
109                      + "(participant with the same name already running");
110       }
111       else
112       {
113         StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster,
114                                                             instanceName);
115         _startCMResultMap.put(instanceName, result);
116       }
117     }
118 
119     // start distributed cluster controllers
120     for (int i = 0; i < 5; i++)
121     {
122       String controllerName = CONTROLLER_PREFIX + "_" + i;
123       if (_startCMResultMap.get(controllerName) != null)
124       {
125         LOG.error("fail to start controller:" + controllerName
126                      + "(controller with the same name already running");
127       }
128       else
129       {
130         StartCMResult result = TestHelper.startController(CONTROLLER_CLUSTER,
131                                                                  controllerName,
132                                                                  ZK_ADDR,
133                                                                  HelixControllerMain.DISTRIBUTED);
134         _startCMResultMap.put(controllerName, result);
135       }
136     }
137 
138     verifyClusters();
139   }
140 
141   @Test
142   public void Test()
143   {
144 
145   }
146 
147   @AfterClass
148   public void afterClass() throws Exception
149   {
150     System.out.println("AFTERCLASS " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
151 
152     /**
153      * shutdown order:
154      *   1) pause the leader (optional)
155      *   2) disconnect all controllers
156      *   3) disconnect leader/disconnect participant
157      */
158     String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
159     // pauseController(_startCMResultMap.get(leader)._manager.getDataAccessor());
160 
161     StartCMResult result;
162 
163     Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
164 
165     while (it.hasNext())
166     {
167       String instanceName = it.next().getKey();
168       if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX))
169       {
170         result = _startCMResultMap.get(instanceName);
171         result._manager.disconnect();
172         result._thread.interrupt();
173         it.remove();
174       }
175       verifyClusters();
176     }
177 
178     result = _startCMResultMap.remove(leader);
179     result._manager.disconnect();
180     result._thread.interrupt();
181 
182     it = _startCMResultMap.entrySet().iterator();
183     while (it.hasNext())
184     {
185       String instanceName = it.next().getKey();
186       result = _startCMResultMap.get(instanceName);
187       result._manager.disconnect();
188       result._thread.interrupt();
189       it.remove();
190     }
191 
192     System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
193   }
194 
195 
196   /**
197    * verify the external view (against the best possible state)
198    *   in the controller cluster and the first cluster
199    */
200   protected void verifyClusters()
201   {
202     boolean result = ClusterStateVerifier.verifyByPolling(
203         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CONTROLLER_CLUSTER));
204     Assert.assertTrue(result);
205     
206     result = ClusterStateVerifier.verifyByPolling(
207         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_PREFIX + "_" + CLASS_NAME + "_0"));
208     Assert.assertTrue(result);
209   }
210 
211   protected void setupStorageCluster(ClusterSetup setupTool, String clusterName,
212        String dbName, int partitionNr, String prefix, int startPort, String stateModel, int replica, boolean rebalance)
213   {
214     setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
215     for (int i = 0; i < NODE_NR; i++)
216     {
217       String instanceName = prefix + "_" + (startPort + i);
218       setupTool.addInstanceToCluster(clusterName, instanceName);
219     }
220     if(rebalance)
221     {
222       setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
223     }
224   }
225 }