View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  
24  import org.apache.helix.TestHelper;
25  import org.apache.helix.ZNRecord;
26  import org.apache.helix.PropertyKey.Builder;
27  import org.apache.helix.controller.HelixControllerMain;
28  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
29  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30  import org.apache.helix.mock.controller.ClusterController;
31  import org.apache.helix.mock.participant.MockParticipant;
32  import org.apache.helix.model.LiveInstance;
33  import org.apache.helix.tools.ClusterSetup;
34  import org.apache.helix.tools.ClusterStateVerifier;
35  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36  import org.testng.Assert;
37  import org.testng.annotations.Test;
38  
39  
40  public class TestDistributedCMMain extends ZkIntegrationTestBase
41  {
42  
43    @Test
44    public void testDistributedCMMain() throws Exception
45    {
46      String className = TestHelper.getTestClassName();
47      String methodName = TestHelper.getTestMethodName();
48      String clusterNamePrefix = className + "_" + methodName;
49      final int n = 5;
50      final int clusterNb = 10;
51  
52      System.out.println("START " + clusterNamePrefix + " at "
53          + new Date(System.currentTimeMillis()));
54  
55      // setup 10 clusters
56      for (int i = 0; i < clusterNb; i++)
57      {
58        String clusterName = clusterNamePrefix + "0_" + i;
59        String participantName = "localhost" + i;
60        String resourceName = "TestDB" + i;
61        TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
62                                participantName, // participant name prefix
63                                resourceName, // resource name prefix
64                                1, // resources
65                                8, // partitions per resource
66                                n, // number of nodes
67                                3, // replicas
68                                "MasterSlave",
69                                true); // do rebalance
70      }
71  
72      // setup controller cluster
73      final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
74      TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
75                                                                             // port
76                              "controller", // participant name prefix
77                              clusterNamePrefix, // resource name prefix
78                              1, // resources
79                              clusterNb, // partitions per resource
80                              n, // number of nodes
81                              3, // replicas
82                              "LeaderStandby",
83                              true); // do rebalance
84  
85      // start distributed cluster controllers
86      ClusterController[] controllers = new ClusterController[n + n];
87      for (int i = 0; i < n; i++)
88      {
89        controllers[i] =
90            new ClusterController(controllerClusterName,
91                                  "controller_" + i,
92                                  ZK_ADDR,
93                                  HelixControllerMain.DISTRIBUTED.toString());
94        controllers[i].syncStart();
95      }
96  
97      boolean result =
98          ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
99                                                                                                        controllerClusterName),
100                                                 30000);
101     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
102 
103     // start first cluster
104     MockParticipant[] participants = new MockParticipant[n];
105     final String firstClusterName = clusterNamePrefix + "0_0";
106     for (int i = 0; i < n; i++)
107     {
108       String instanceName = "localhost0_" + (12918 + i);
109       participants[i] =
110           new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
111       participants[i].syncStart();
112     }
113 
114     result =
115         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
116                                                                                  firstClusterName));
117     Assert.assertTrue(result, "first cluster NOT in ideal state");
118     
119     
120     // add more controllers to controller cluster
121     ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
122     for (int i = 0; i < n; i++)
123     {
124       String controller = "controller_" + (n + i);
125       setupTool.addInstanceToCluster(controllerClusterName, controller);
126     }
127     setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
128     for (int i = n; i < 2 * n; i++)
129     {
130       controllers[i] =
131           new ClusterController(controllerClusterName,
132                                 "controller_" + i,
133                                 ZK_ADDR,
134                                 HelixControllerMain.DISTRIBUTED.toString());
135       controllers[i].syncStart();
136     }
137 
138     
139     // verify controller cluster
140     result =
141         ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
142                                                                                                       controllerClusterName));
143     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
144     
145     // verify first cluster
146     result =
147         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
148                                                                                  firstClusterName));
149     Assert.assertTrue(result, "first cluster NOT in ideal state");
150 
151     
152     // stop controller_0-5
153     ZkBaseDataAccessor<ZNRecord> baseAccessor =
154         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
155     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
156     Builder keyBuilder = accessor.keyBuilder();
157     for (int i = 0; i < n; i++)
158     {
159       LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
160       String leaderName = leader.getId();
161       int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
162       controllers[j].syncStop();
163       
164       result =
165           ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
166                                                                                                         controllerClusterName));
167       Assert.assertTrue(result, "Controller cluster NOT in ideal state");
168       
169       result =
170           ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
171                                                                                    firstClusterName));
172       Assert.assertTrue(result, "first cluster NOT in ideal state");
173     }
174 
175     
176     // clean up
177     // wait for all zk callbacks done
178     System.out.println("Cleaning up...");
179     Thread.sleep(2000);
180     for (int i = 0; i < 5; i++)
181     {
182       result =
183           ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
184                                                                                                         controllerClusterName));
185       controllers[i].syncStop();
186     }
187     
188     // Thread.sleep(2000);
189     for (int i = 0; i < 5; i++)
190     {
191       participants[i].syncStop();
192     }
193 
194     System.out.println("END " + clusterNamePrefix + " at "
195         + new Date(System.currentTimeMillis()));
196 
197   }
198 }