View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  
24  import org.apache.helix.TestHelper;
25  import org.apache.helix.ZNRecord;
26  import org.apache.helix.PropertyKey.Builder;
27  import org.apache.helix.controller.HelixControllerMain;
28  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
29  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30  import org.apache.helix.mock.controller.ClusterController;
31  import org.apache.helix.mock.participant.MockParticipant;
32  import org.apache.helix.model.LiveInstance;
33  import org.apache.helix.tools.ClusterStateVerifier;
34  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
35  import org.testng.Assert;
36  import org.testng.annotations.Test;
37  
38  
39  public class TestDistributedClusterController extends ZkIntegrationTestBase
40  {
41  
42    @Test
43    public void testDistributedClusterController() throws Exception
44    {
45      String className = TestHelper.getTestClassName();
46      String methodName = TestHelper.getTestMethodName();
47      String clusterNamePrefix = className + "_" + methodName;
48      final int n = 5;
49      final int clusterNb = 10;
50  
51      System.out.println("START " + clusterNamePrefix + " at "
52          + new Date(System.currentTimeMillis()));
53  
54      // setup 10 clusters
55      for (int i = 0; i < clusterNb; i++)
56      {
57        String clusterName = clusterNamePrefix + "0_" + i;
58        String participantName = "localhost" + i;
59        String resourceName = "TestDB" + i;
60        TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
61                                participantName, // participant name prefix
62                                resourceName, // resource name prefix
63                                1, // resources
64                                8, // partitions per resource
65                                n, // number of nodes
66                                3, // replicas
67                                "MasterSlave",
68                                true); // do rebalance
69      }
70  
71      // setup controller cluster
72      final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
73      TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0, // controller
74                                                                             // port
75                              "controller", // participant name prefix
76                              clusterNamePrefix, // resource name prefix
77                              1, // resources
78                              clusterNb, // partitions per resource
79                              n, // number of nodes
80                              3, // replicas
81                              "LeaderStandby",
82                              true); // do rebalance
83  
84      // start distributed cluster controllers
85      ClusterController[] controllers = new ClusterController[n];
86      for (int i = 0; i < n; i++)
87      {
88        controllers[i] =
89            new ClusterController(controllerClusterName,
90                                  "controller_" + i,
91                                  ZK_ADDR,
92                                  HelixControllerMain.DISTRIBUTED.toString());
93        controllers[i].syncStart();
94      }
95  
96      boolean result =
97          ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
98                                                                                                        controllerClusterName),
99                                                  30000);
100     Assert.assertTrue(result, "Controller cluster NOT in ideal state");
101 
102     // start first cluster
103     MockParticipant[] participants = new MockParticipant[n];
104     final String firstClusterName = clusterNamePrefix + "0_0";
105     for (int i = 0; i < n; i++)
106     {
107       String instanceName = "localhost0_" + (12918 + i);
108       participants[i] =
109           new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
110       participants[i].syncStart();
111     }
112 
113     result =
114         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
115                                                                                  firstClusterName));
116     Assert.assertTrue(result, "first cluster NOT in ideal state");
117 
118     
119     // stop current leader in controller cluster
120     ZkBaseDataAccessor<ZNRecord> baseAccessor =
121         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
122     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
123     Builder keyBuilder = accessor.keyBuilder();
124     LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
125     String leaderName = leader.getId();
126     int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
127     controllers[j].syncStop();
128     
129     
130     // setup the second cluster
131     MockParticipant[] participants2 = new MockParticipant[n];
132     final String secondClusterName = clusterNamePrefix + "0_1";
133     for (int i = 0; i < n; i++)
134     {
135       String instanceName = "localhost1_" + (12918 + i);
136       participants2[i] =
137           new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
138       participants2[i].syncStart();
139     }
140 
141     result =
142         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
143                                                                                  secondClusterName));
144     Assert.assertTrue(result, "second cluster NOT in ideal state");
145 
146     // clean up
147     // wait for all zk callbacks done
148     System.out.println("Cleaning up...");
149     Thread.sleep(1000);
150     for (int i = 0; i < 5; i++)
151     {
152       result =
153           ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
154                                                                                                         controllerClusterName));
155       controllers[i].syncStop();
156     }
157 
158     for (int i = 0; i < 5; i++)
159     {
160       participants[i].syncStop();
161     }
162 
163     System.out.println("END " + clusterNamePrefix + " at "
164         + new Date(System.currentTimeMillis()));
165 
166   }
167 }