1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.Map.Entry;
27
28 import org.apache.helix.PropertyType;
29 import org.apache.helix.TestHelper;
30 import org.apache.helix.TestHelper.StartCMResult;
31 import org.apache.helix.controller.HelixControllerMain;
32 import org.apache.helix.model.PauseSignal;
33 import org.apache.helix.tools.ClusterSetup;
34 import org.apache.helix.tools.ClusterStateVerifier;
35 import org.apache.log4j.Logger;
36 import org.testng.Assert;
37 import org.testng.annotations.AfterClass;
38 import org.testng.annotations.BeforeClass;
39 import org.testng.annotations.Test;
40
41
42 public class TestAddClusterV2 extends ZkIntegrationTestBase
43 {
44 private static Logger LOG = Logger.getLogger(TestAddClusterV2.class);
45
46 protected static final int CLUSTER_NR = 10;
47 protected static final int NODE_NR = 5;
48 protected static final int START_PORT = 12918;
49 protected static final String STATE_MODEL = "MasterSlave";
50 protected ClusterSetup _setupTool = null;
51 protected Map<String, StartCMResult> _startCMResultMap = new HashMap<String, StartCMResult>();
52
53 protected final String CLASS_NAME = getShortClassName();
54 protected final String CONTROLLER_CLUSTER = CONTROLLER_CLUSTER_PREFIX + "_" + CLASS_NAME;
55
56 protected static final String TEST_DB = "TestDB";
57
58 @BeforeClass
59 public void beforeClass() throws Exception
60 {
61 System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
62
63 String namespace = "/" + CONTROLLER_CLUSTER;
64 if (_gZkClient.exists(namespace))
65 {
66 _gZkClient.deleteRecursive(namespace);
67 }
68
69 for (int i = 0; i < CLUSTER_NR; i++)
70 {
71 namespace = "/" + CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
72 if (_gZkClient.exists(namespace))
73 {
74 _gZkClient.deleteRecursive(namespace);
75 }
76 }
77
78 _setupTool = new ClusterSetup(ZK_ADDR);
79
80
81
82 _setupTool.addCluster(CONTROLLER_CLUSTER, true);
83 for (int i = 0; i < NODE_NR; i++)
84 {
85 String controllerName = CONTROLLER_PREFIX + "_" + i;
86 _setupTool.addInstanceToCluster(CONTROLLER_CLUSTER, controllerName);
87 }
88
89
90
91 for (int i = 0; i < CLUSTER_NR; i++)
92 {
93 String clusterName = CLUSTER_PREFIX + "_" + CLASS_NAME + "_" + i;
94 _setupTool.addCluster(clusterName, true);
95 _setupTool.activateCluster(clusterName, CONTROLLER_CLUSTER, true);
96 }
97
98 final String firstCluster = CLUSTER_PREFIX + "_" + CLASS_NAME + "_0";
99 setupStorageCluster(_setupTool, firstCluster, TEST_DB, 20, PARTICIPANT_PREFIX,
100 START_PORT, "MasterSlave", 3, true);
101
102
103 for (int i = 0; i < 5; i++)
104 {
105 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
106 if (_startCMResultMap.get(instanceName) != null)
107 {
108 LOG.error("fail to start participant:" + instanceName
109 + "(participant with the same name already running");
110 }
111 else
112 {
113 StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, firstCluster,
114 instanceName);
115 _startCMResultMap.put(instanceName, result);
116 }
117 }
118
119
120 for (int i = 0; i < 5; i++)
121 {
122 String controllerName = CONTROLLER_PREFIX + "_" + i;
123 if (_startCMResultMap.get(controllerName) != null)
124 {
125 LOG.error("fail to start controller:" + controllerName
126 + "(controller with the same name already running");
127 }
128 else
129 {
130 StartCMResult result = TestHelper.startController(CONTROLLER_CLUSTER,
131 controllerName,
132 ZK_ADDR,
133 HelixControllerMain.DISTRIBUTED);
134 _startCMResultMap.put(controllerName, result);
135 }
136 }
137
138 verifyClusters();
139 }
140
141 @Test
142 public void Test()
143 {
144
145 }
146
147 @AfterClass
148 public void afterClass() throws Exception
149 {
150 System.out.println("AFTERCLASS " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
151
152
153
154
155
156
157
158 String leader = getCurrentLeader(_gZkClient, CONTROLLER_CLUSTER);
159
160
161 StartCMResult result;
162
163 Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
164
165 while (it.hasNext())
166 {
167 String instanceName = it.next().getKey();
168 if (!instanceName.equals(leader) && instanceName.startsWith(CONTROLLER_PREFIX))
169 {
170 result = _startCMResultMap.get(instanceName);
171 result._manager.disconnect();
172 result._thread.interrupt();
173 it.remove();
174 }
175 verifyClusters();
176 }
177
178 result = _startCMResultMap.remove(leader);
179 result._manager.disconnect();
180 result._thread.interrupt();
181
182 it = _startCMResultMap.entrySet().iterator();
183 while (it.hasNext())
184 {
185 String instanceName = it.next().getKey();
186 result = _startCMResultMap.get(instanceName);
187 result._manager.disconnect();
188 result._thread.interrupt();
189 it.remove();
190 }
191
192 System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
193 }
194
195
196
197
198
199
200 protected void verifyClusters()
201 {
202 boolean result = ClusterStateVerifier.verifyByPolling(
203 new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CONTROLLER_CLUSTER));
204 Assert.assertTrue(result);
205
206 result = ClusterStateVerifier.verifyByPolling(
207 new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_PREFIX + "_" + CLASS_NAME + "_0"));
208 Assert.assertTrue(result);
209 }
210
211 protected void setupStorageCluster(ClusterSetup setupTool, String clusterName,
212 String dbName, int partitionNr, String prefix, int startPort, String stateModel, int replica, boolean rebalance)
213 {
214 setupTool.addResourceToCluster(clusterName, dbName, partitionNr, stateModel);
215 for (int i = 0; i < NODE_NR; i++)
216 {
217 String instanceName = prefix + "_" + (startPort + i);
218 setupTool.addInstanceToCluster(clusterName, instanceName);
219 }
220 if(rebalance)
221 {
222 setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
223 }
224 }
225 }