1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23
24 import org.apache.helix.TestHelper;
25 import org.apache.helix.ZNRecord;
26 import org.apache.helix.PropertyKey.Builder;
27 import org.apache.helix.controller.HelixControllerMain;
28 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
29 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30 import org.apache.helix.mock.controller.ClusterController;
31 import org.apache.helix.mock.participant.MockParticipant;
32 import org.apache.helix.model.LiveInstance;
33 import org.apache.helix.tools.ClusterSetup;
34 import org.apache.helix.tools.ClusterStateVerifier;
35 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36 import org.testng.Assert;
37 import org.testng.annotations.Test;
38
39
40 public class TestDistributedCMMain extends ZkIntegrationTestBase
41 {
42
43 @Test
44 public void testDistributedCMMain() throws Exception
45 {
46 String className = TestHelper.getTestClassName();
47 String methodName = TestHelper.getTestMethodName();
48 String clusterNamePrefix = className + "_" + methodName;
49 final int n = 5;
50 final int clusterNb = 10;
51
52 System.out.println("START " + clusterNamePrefix + " at "
53 + new Date(System.currentTimeMillis()));
54
55
56 for (int i = 0; i < clusterNb; i++)
57 {
58 String clusterName = clusterNamePrefix + "0_" + i;
59 String participantName = "localhost" + i;
60 String resourceName = "TestDB" + i;
61 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
62 participantName,
63 resourceName,
64 1,
65 8,
66 n,
67 3,
68 "MasterSlave",
69 true);
70 }
71
72
73 final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
74 TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0,
75
76 "controller",
77 clusterNamePrefix,
78 1,
79 clusterNb,
80 n,
81 3,
82 "LeaderStandby",
83 true);
84
85
86 ClusterController[] controllers = new ClusterController[n + n];
87 for (int i = 0; i < n; i++)
88 {
89 controllers[i] =
90 new ClusterController(controllerClusterName,
91 "controller_" + i,
92 ZK_ADDR,
93 HelixControllerMain.DISTRIBUTED.toString());
94 controllers[i].syncStart();
95 }
96
97 boolean result =
98 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
99 controllerClusterName),
100 30000);
101 Assert.assertTrue(result, "Controller cluster NOT in ideal state");
102
103
104 MockParticipant[] participants = new MockParticipant[n];
105 final String firstClusterName = clusterNamePrefix + "0_0";
106 for (int i = 0; i < n; i++)
107 {
108 String instanceName = "localhost0_" + (12918 + i);
109 participants[i] =
110 new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
111 participants[i].syncStart();
112 }
113
114 result =
115 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
116 firstClusterName));
117 Assert.assertTrue(result, "first cluster NOT in ideal state");
118
119
120
121 ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
122 for (int i = 0; i < n; i++)
123 {
124 String controller = "controller_" + (n + i);
125 setupTool.addInstanceToCluster(controllerClusterName, controller);
126 }
127 setupTool.rebalanceStorageCluster(controllerClusterName, clusterNamePrefix + "0", 6);
128 for (int i = n; i < 2 * n; i++)
129 {
130 controllers[i] =
131 new ClusterController(controllerClusterName,
132 "controller_" + i,
133 ZK_ADDR,
134 HelixControllerMain.DISTRIBUTED.toString());
135 controllers[i].syncStart();
136 }
137
138
139
140 result =
141 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
142 controllerClusterName));
143 Assert.assertTrue(result, "Controller cluster NOT in ideal state");
144
145
146 result =
147 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
148 firstClusterName));
149 Assert.assertTrue(result, "first cluster NOT in ideal state");
150
151
152
153 ZkBaseDataAccessor<ZNRecord> baseAccessor =
154 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
155 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
156 Builder keyBuilder = accessor.keyBuilder();
157 for (int i = 0; i < n; i++)
158 {
159 LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
160 String leaderName = leader.getId();
161 int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
162 controllers[j].syncStop();
163
164 result =
165 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
166 controllerClusterName));
167 Assert.assertTrue(result, "Controller cluster NOT in ideal state");
168
169 result =
170 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
171 firstClusterName));
172 Assert.assertTrue(result, "first cluster NOT in ideal state");
173 }
174
175
176
177
178 System.out.println("Cleaning up...");
179 Thread.sleep(2000);
180 for (int i = 0; i < 5; i++)
181 {
182 result =
183 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
184 controllerClusterName));
185 controllers[i].syncStop();
186 }
187
188
189 for (int i = 0; i < 5; i++)
190 {
191 participants[i].syncStop();
192 }
193
194 System.out.println("END " + clusterNamePrefix + " at "
195 + new Date(System.currentTimeMillis()));
196
197 }
198 }