1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.Map;
26 import java.util.Set;
27
28 import org.apache.helix.HelixDataAccessor;
29 import org.apache.helix.TestHelper;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.TestHelper.StartCMResult;
33 import org.apache.helix.controller.HelixControllerMain;
34 import org.apache.helix.controller.stages.ClusterDataCache;
35 import org.apache.helix.integration.TestAutoRebalancePartitionLimit.ExternalViewBalancedVerifier;
36 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
37 import org.apache.helix.manager.zk.ZNRecordSerializer;
38 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39 import org.apache.helix.manager.zk.ZkClient;
40 import org.apache.helix.model.ExternalView;
41 import org.apache.helix.model.IdealState.IdealStateModeProperty;
42 import org.apache.helix.tools.ClusterSetup;
43 import org.apache.helix.tools.ClusterStateVerifier;
44 import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
45 import org.apache.log4j.Logger;
46 import org.testng.Assert;
47 import org.testng.annotations.BeforeClass;
48 import org.testng.annotations.Test;
49
50
51 public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck
52 {
53 private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
54 String db2 = TEST_DB+"2";
55 String _tag = "SSDSSD";
56 @BeforeClass
57 public void beforeClass() throws Exception
58 {
59
60 System.out.println("START " + CLASS_NAME + " at "
61 + new Date(System.currentTimeMillis()));
62
63 _zkClient = new ZkClient(ZK_ADDR);
64 _zkClient.setZkSerializer(new ZNRecordSerializer());
65 String namespace = "/" + CLUSTER_NAME;
66 if (_zkClient.exists(namespace))
67 {
68 _zkClient.deleteRecursive(namespace);
69 }
70 _setupTool = new ClusterSetup(ZK_ADDR);
71
72
73 _setupTool.addCluster(CLUSTER_NAME, true);
74 _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL, IdealStateModeProperty.AUTO_REBALANCE+"");
75
76 _setupTool.addResourceToCluster(CLUSTER_NAME, db2, _PARTITIONS, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"");
77
78
79 for (int i = 0; i < NODE_NR; i++)
80 {
81 String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
82 _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
83 }
84
85 _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
86
87 for (int i = 0; i < 3; i++)
88 {
89 String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
90 _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, _tag);
91 }
92
93 _setupTool.rebalanceCluster(CLUSTER_NAME, db2, 1, "ucpx",_tag);
94
95
96 for (int i = 0; i < NODE_NR; i++)
97 {
98 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
99 if (_startCMResultMap.get(instanceName) != null)
100 {
101 LOG.error("fail to start particpant:" + instanceName
102 + "(participant with same name already exists)");
103 }
104 else
105 {
106 StartCMResult result =
107 TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
108 _startCMResultMap.put(instanceName, result);
109 }
110 }
111
112
113 String controllerName = CONTROLLER_PREFIX + "_0";
114 StartCMResult startResult =
115 TestHelper.startController(CLUSTER_NAME,
116 controllerName,
117 ZK_ADDR,
118 HelixControllerMain.STANDALONE);
119 _startCMResultMap.put(controllerName, startResult);
120
121 boolean result =
122 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
123 CLUSTER_NAME, TEST_DB));
124
125 Assert.assertTrue(result);
126
127
128 }
129
130
131 @Test()
132 public void testDropResourceAutoRebalance() throws Exception
133 {
134
135 _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", _PARTITIONS, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"");
136
137 _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
138
139 boolean result =
140 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
141 CLUSTER_NAME, "MyDB"));
142 Assert.assertTrue(result);
143
144 String command =
145 "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB";
146 ClusterSetup.processCommandLineArgs(command.split(" "));
147
148 TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
149 30 * 1000,
150 CLUSTER_NAME,
151 "MyDB",
152 TestHelper.<String> setOf("localhost_12918",
153 "localhost_12919",
154 "localhost_12920",
155 "localhost_12921",
156 "localhost_12922"),
157 ZK_ADDR);
158
159
160 _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB2", _PARTITIONS, "MasterSlave", IdealStateModeProperty.AUTO_REBALANCE+"");
161
162 _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
163
164 result =
165 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
166 CLUSTER_NAME, "MyDB2"));
167 Assert.assertTrue(result);
168
169 command =
170 "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
171 ClusterSetup.processCommandLineArgs(command.split(" "));
172
173 TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
174 30 * 1000,
175 CLUSTER_NAME,
176 "MyDB2",
177 TestHelper.<String> setOf("localhost_12918",
178 "localhost_12919",
179 "localhost_12920",
180 "localhost_12921",
181 "localhost_12922"),
182 ZK_ADDR);
183 }
184
185
186
187
188 @Test()
189 public void testAutoRebalance() throws Exception
190 {
191
192
193 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
194 _startCMResultMap.get(instanceName)._manager.disconnect();
195 Thread.currentThread().sleep(1000);
196 _startCMResultMap.get(instanceName)._thread.interrupt();
197
198
199 boolean result =
200 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
201 CLUSTER_NAME, TEST_DB));
202 Assert.assertTrue(result);
203
204
205 for (int i = 0; i < 2; i++)
206 {
207 String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
208 _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
209
210 StartCMResult resultx =
211 TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
212 _startCMResultMap.put(storageNodeName, resultx);
213 }
214 Thread.sleep(1000);
215 result =
216 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
217 CLUSTER_NAME, TEST_DB));
218 Assert.assertTrue(result);
219
220 result =
221 ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
222 CLUSTER_NAME, db2));
223 Assert.assertTrue(result);
224 HelixDataAccessor accessor = new ZKHelixDataAccessor( CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
225 Builder keyBuilder = accessor.keyBuilder();
226 ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
227 Set<String> instancesSet = new HashSet<String>();
228 for(String partitionName : ev.getRecord().getMapFields().keySet())
229 {
230 Map<String, String> assignmentMap = ev.getRecord().getMapField(partitionName);
231 for(String instance : assignmentMap.keySet())
232 {
233 instancesSet.add(instance);
234 }
235 }
236 Assert.assertEquals(instancesSet.size(), 2);
237 }
238
239 static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances)
240 {
241 if(externalView == null)
242 {
243 return false;
244 }
245 Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
246 for(String partitionName : externalView.getMapFields().keySet())
247 {
248 Map<String, String> assignmentMap = externalView.getMapField(partitionName);
249
250 for(String instance : assignmentMap.keySet())
251 {
252 if(assignmentMap.get(instance).equals(masterState))
253 {
254 if(!masterPartitionsCountMap.containsKey(instance))
255 {
256 masterPartitionsCountMap.put(instance, 0);
257 }
258 masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
259 }
260 }
261 }
262
263 int perInstancePartition = partitionCount / instances;
264
265 int totalCount = 0;
266 for(String instanceName : masterPartitionsCountMap.keySet())
267 {
268 int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
269 totalCount += instancePartitionCount;
270 if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1 ))
271 {
272 return false;
273 }
274 if(instancePartitionCount == perInstancePartition +1)
275 {
276 if(partitionCount % instances == 0)
277 {
278 return false;
279 }
280 }
281 }
282 if(partitionCount != totalCount)
283 {
284 return false;
285 }
286 return true;
287
288 }
289
290 public static class ExternalViewBalancedVerifier implements ZkVerifier
291 {
292 ZkClient _client;
293 String _clusterName;
294 String _resourceName;
295
296 public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
297 {
298 _client = client;
299 _clusterName = clusterName;
300 _resourceName = resourceName;
301 }
302 @Override
303 public boolean verify()
304 {
305 HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
306 Builder keyBuilder = accessor.keyBuilder();
307 int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
308 ClusterDataCache cache = new ClusterDataCache();
309 cache.refresh(accessor);
310 String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
311 int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
312 String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
313 int instances = 0;
314 for(String liveInstanceName : cache.getLiveInstances().keySet())
315 {
316 if(cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag))
317 {
318 instances ++;
319 }
320 }
321 if(instances == 0)
322 {
323 instances = cache.getLiveInstances().size();
324 }
325 ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
326 if(ev == null)
327 {
328 return false;
329 }
330 return verifyBalanceExternalView(ev.getRecord(), numberOfPartitions, masterValue, replicas, instances);
331 }
332
333 @Override
334 public ZkClient getZkClient()
335 {
336 return _client;
337 }
338
339 @Override
340 public String getClusterName()
341 {
342 return _clusterName;
343 }
344
345 }
346 }