1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.List;
24
25 import org.apache.helix.HelixAdmin;
26 import org.apache.helix.HelixDataAccessor;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.TestHelper;
29 import org.apache.helix.ZNRecord;
30 import org.apache.helix.ZkUnitTestBase;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.controller.HelixControllerMain;
33 import org.apache.helix.controller.pipeline.Pipeline;
34 import org.apache.helix.controller.stages.AttributeName;
35 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
36 import org.apache.helix.controller.stages.ClusterEvent;
37 import org.apache.helix.controller.stages.CurrentStateComputationStage;
38 import org.apache.helix.controller.stages.MessageGenerationPhase;
39 import org.apache.helix.controller.stages.MessageSelectionStage;
40 import org.apache.helix.controller.stages.MessageSelectionStageOutput;
41 import org.apache.helix.controller.stages.MessageThrottleStage;
42 import org.apache.helix.controller.stages.ReadClusterDataStage;
43 import org.apache.helix.controller.stages.ResourceComputationStage;
44 import org.apache.helix.controller.stages.TaskAssignmentStage;
45 import org.apache.helix.manager.zk.ZKHelixAdmin;
46 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
47 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
48 import org.apache.helix.model.CurrentState;
49 import org.apache.helix.model.Message;
50 import org.apache.helix.model.Partition;
51 import org.apache.helix.model.Message.Attributes;
52 import org.apache.log4j.Logger;
53 import org.testng.Assert;
54 import org.testng.annotations.Test;
55
56
57 public class TestRebalancePipeline extends ZkUnitTestBase
58 {
59 private static final Logger LOG =
60 Logger.getLogger(TestRebalancePipeline.class.getName());
61 final String _className = getShortClassName();
62
63 @Test
64 public void testDuplicateMsg()
65 {
66 String clusterName = "CLUSTER_" + _className + "_dup";
67 System.out.println("START " + clusterName + " at "
68 + new Date(System.currentTimeMillis()));
69
70 HelixDataAccessor accessor =
71 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
72
73 HelixManager manager = new DummyClusterManager(clusterName, accessor);
74 ClusterEvent event = new ClusterEvent("testEvent");
75 event.addAttribute("helixmanager", manager);
76
77 final String resourceName = "testResource_dup";
78 String[] resourceGroups = new String[] { resourceName };
79
80
81 setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
82 setupLiveInstances(clusterName, new int[] { 0, 1 });
83 setupStateModel(clusterName);
84
85
86 Pipeline dataRefresh = new Pipeline();
87 dataRefresh.addStage(new ReadClusterDataStage());
88
89
90 Pipeline rebalancePipeline = new Pipeline();
91 rebalancePipeline.addStage(new ResourceComputationStage());
92 rebalancePipeline.addStage(new CurrentStateComputationStage());
93 rebalancePipeline.addStage(new BestPossibleStateCalcStage());
94 rebalancePipeline.addStage(new MessageGenerationPhase());
95 rebalancePipeline.addStage(new MessageSelectionStage());
96 rebalancePipeline.addStage(new MessageThrottleStage());
97 rebalancePipeline.addStage(new TaskAssignmentStage());
98
99
100 setCurrentState(clusterName,
101 "localhost_0",
102 resourceName,
103 resourceName + "_0",
104 "session_0",
105 "OFFLINE");
106 setCurrentState(clusterName,
107 "localhost_1",
108 resourceName,
109 resourceName + "_0",
110 "session_1",
111 "SLAVE");
112
113 runPipeline(event, dataRefresh);
114 runPipeline(event, rebalancePipeline);
115 MessageSelectionStageOutput msgSelOutput =
116 event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
117 List<Message> messages =
118 msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
119 Assert.assertEquals(messages.size(),
120 1,
121 "Should output 1 message: OFFLINE-SLAVE for node0");
122 Message message = messages.get(0);
123 Assert.assertEquals(message.getFromState(), "OFFLINE");
124 Assert.assertEquals(message.getToState(), "SLAVE");
125 Assert.assertEquals(message.getTgtName(), "localhost_0");
126
127
128
129 setCurrentState(clusterName,
130 "localhost_0",
131 resourceName,
132 resourceName + "_0",
133 "session_1",
134 "SLAVE");
135
136 runPipeline(event, dataRefresh);
137 runPipeline(event, rebalancePipeline);
138 msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
139 messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
140 Assert.assertEquals(messages.size(),
141 0,
142 "Should NOT output 1 message: SLAVE-MASTER for node1");
143
144 System.out.println("END " + clusterName + " at "
145 + new Date(System.currentTimeMillis()));
146
147 }
148
149 @Test
150 public void testMsgTriggeredRebalance() throws Exception
151 {
152 String clusterName = "CLUSTER_" + _className + "_msgTrigger";
153 System.out.println("START " + clusterName + " at "
154 + new Date(System.currentTimeMillis()));
155
156 HelixDataAccessor accessor =
157 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
158 HelixManager manager = new DummyClusterManager(clusterName, accessor);
159 ClusterEvent event = new ClusterEvent("testEvent");
160
161 final String resourceName = "testResource_dup";
162 String[] resourceGroups = new String[] { resourceName };
163
164 TestHelper.setupEmptyCluster(_gZkClient, clusterName);
165
166
167
168 setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
169 setupStateModel(clusterName);
170 setupInstances(clusterName, new int[] { 0, 1 });
171 setupLiveInstances(clusterName, new int[] { 0, 1 });
172
173 TestHelper.startController(clusterName,
174 "controller_0",
175 ZK_ADDR,
176 HelixControllerMain.STANDALONE);
177
178
179 Thread.sleep(1000);
180
181 Builder keyBuilder = accessor.keyBuilder();
182 List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
183 Assert.assertEquals(messages.size(), 1);
184 messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
185 Assert.assertEquals(messages.size(), 1);
186
187
188
189
190
191 setCurrentState(clusterName,
192 "localhost_0",
193 resourceName,
194 resourceName + "_0",
195 "session_0",
196 "SLAVE");
197 setCurrentState(clusterName,
198 "localhost_1",
199 resourceName,
200 resourceName + "_0",
201 "session_1",
202 "SLAVE");
203 Thread.sleep(1000);
204 messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
205 Assert.assertEquals(messages.size(), 1);
206
207 messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
208 Assert.assertEquals(messages.size(), 1);
209
210
211
212
213 messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
214 accessor.removeProperty(keyBuilder.message("localhost_0", messages.get(0)));
215 Thread.sleep(1000);
216
217 messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
218 Assert.assertEquals(messages.size(), 1);
219 ZNRecord msg =
220 accessor.getProperty(keyBuilder.message("localhost_0", messages.get(0)))
221 .getRecord();
222 String toState = msg.getSimpleField(Attributes.TO_STATE.toString());
223 Assert.assertEquals(toState, "MASTER");
224
225 System.out.println("END " + clusterName + " at "
226 + new Date(System.currentTimeMillis()));
227
228 }
229
230 @Test
231 public void testChangeIdealStateWithPendingMsg()
232 {
233 String clusterName = "CLUSTER_" + _className + "_pending";
234 System.out.println("START " + clusterName + " at "
235 + new Date(System.currentTimeMillis()));
236
237 HelixDataAccessor accessor =
238 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
239 HelixManager manager = new DummyClusterManager(clusterName, accessor);
240 ClusterEvent event = new ClusterEvent("testEvent");
241 event.addAttribute("helixmanager", manager);
242
243 final String resourceName = "testResource_pending";
244 String[] resourceGroups = new String[] { resourceName };
245
246
247 setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
248 setupLiveInstances(clusterName, new int[] { 0, 1 });
249 setupStateModel(clusterName);
250
251
252 Pipeline dataRefresh = new Pipeline();
253 dataRefresh.addStage(new ReadClusterDataStage());
254
255
256 Pipeline rebalancePipeline = new Pipeline();
257 rebalancePipeline.addStage(new ResourceComputationStage());
258 rebalancePipeline.addStage(new CurrentStateComputationStage());
259 rebalancePipeline.addStage(new BestPossibleStateCalcStage());
260 rebalancePipeline.addStage(new MessageGenerationPhase());
261 rebalancePipeline.addStage(new MessageSelectionStage());
262 rebalancePipeline.addStage(new MessageThrottleStage());
263 rebalancePipeline.addStage(new TaskAssignmentStage());
264
265
266 setCurrentState(clusterName,
267 "localhost_0",
268 resourceName,
269 resourceName + "_0",
270 "session_0",
271 "OFFLINE");
272 setCurrentState(clusterName,
273 "localhost_1",
274 resourceName,
275 resourceName + "_0",
276 "session_1",
277 "SLAVE");
278
279 runPipeline(event, dataRefresh);
280 runPipeline(event, rebalancePipeline);
281 MessageSelectionStageOutput msgSelOutput =
282 event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
283 List<Message> messages =
284 msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
285 Assert.assertEquals(messages.size(),
286 1,
287 "Should output 1 message: OFFLINE-SLAVE for node0");
288 Message message = messages.get(0);
289 Assert.assertEquals(message.getFromState(), "OFFLINE");
290 Assert.assertEquals(message.getToState(), "SLAVE");
291 Assert.assertEquals(message.getTgtName(), "localhost_0");
292
293
294
295 HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
296 admin.dropResource(clusterName, resourceName);
297
298 runPipeline(event, dataRefresh);
299 runPipeline(event, rebalancePipeline);
300 msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
301 messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
302 Assert.assertEquals(messages.size(),
303 1,
304 "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
305
306 message = messages.get(0);
307 Assert.assertEquals(message.getFromState(), "SLAVE");
308 Assert.assertEquals(message.getToState(), "OFFLINE");
309 Assert.assertEquals(message.getTgtName(), "localhost_1");
310
311
312
313 Builder keyBuilder = accessor.keyBuilder();
314 List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
315 accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
316 runPipeline(event, dataRefresh);
317 runPipeline(event, rebalancePipeline);
318 msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
319 messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
320 Assert.assertEquals(messages.size(),
321 1,
322 "Should output 1 message: OFFLINE->DROPPED for localhost_0");
323 message = messages.get(0);
324 Assert.assertEquals(message.getFromState(), "OFFLINE");
325 Assert.assertEquals(message.getToState(), "DROPPED");
326 Assert.assertEquals(message.getTgtName(), "localhost_0");
327
328 System.out.println("END " + clusterName + " at "
329 + new Date(System.currentTimeMillis()));
330
331 }
332
333 @Test
334 public void testMasterXfer()
335 {
336 String clusterName = "CLUSTER_" + _className + "_xfer";
337
338 System.out.println("START " + clusterName + " at "
339 + new Date(System.currentTimeMillis()));
340
341 HelixDataAccessor accessor =
342 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
343 HelixManager manager = new DummyClusterManager(clusterName, accessor);
344 ClusterEvent event = new ClusterEvent("testEvent");
345 event.addAttribute("helixmanager", manager);
346
347 final String resourceName = "testResource_xfer";
348 String[] resourceGroups = new String[] { resourceName };
349
350
351 setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
352 setupLiveInstances(clusterName, new int[] { 1 });
353 setupStateModel(clusterName);
354
355
356 Pipeline dataRefresh = new Pipeline();
357 dataRefresh.addStage(new ReadClusterDataStage());
358
359
360 Pipeline rebalancePipeline = new Pipeline();
361 rebalancePipeline.addStage(new ResourceComputationStage());
362 rebalancePipeline.addStage(new CurrentStateComputationStage());
363 rebalancePipeline.addStage(new BestPossibleStateCalcStage());
364 rebalancePipeline.addStage(new MessageGenerationPhase());
365 rebalancePipeline.addStage(new MessageSelectionStage());
366 rebalancePipeline.addStage(new MessageThrottleStage());
367 rebalancePipeline.addStage(new TaskAssignmentStage());
368
369
370 setCurrentState(clusterName,
371 "localhost_1",
372 resourceName,
373 resourceName + "_0",
374 "session_1",
375 "SLAVE");
376
377 runPipeline(event, dataRefresh);
378 runPipeline(event, rebalancePipeline);
379 MessageSelectionStageOutput msgSelOutput =
380 event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
381 List<Message> messages =
382 msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
383 Assert.assertEquals(messages.size(),
384 1,
385 "Should output 1 message: SLAVE-MASTER for node1");
386 Message message = messages.get(0);
387 Assert.assertEquals(message.getFromState(), "SLAVE");
388 Assert.assertEquals(message.getToState(), "MASTER");
389 Assert.assertEquals(message.getTgtName(), "localhost_1");
390
391
392
393 setupLiveInstances(clusterName, new int[] { 0 });
394 setCurrentState(clusterName,
395 "localhost_0",
396 resourceName,
397 resourceName + "_0",
398 "session_0",
399 "SLAVE");
400
401 runPipeline(event, dataRefresh);
402 runPipeline(event, rebalancePipeline);
403 msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
404 messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
405 Assert.assertEquals(messages.size(),
406 0,
407 "Should NOT output 1 message: SLAVE-MASTER for node0");
408
409 System.out.println("END " + clusterName + " at "
410 + new Date(System.currentTimeMillis()));
411
412 }
413
414 protected void setCurrentState(String clusterName,
415 String instance,
416 String resourceGroupName,
417 String resourceKey,
418 String sessionId,
419 String state)
420 {
421 ZKHelixDataAccessor accessor =
422 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
423 Builder keyBuilder = accessor.keyBuilder();
424
425 CurrentState curState = new CurrentState(resourceGroupName);
426 curState.setState(resourceKey, state);
427 curState.setSessionId(sessionId);
428 curState.setStateModelDefRef("MasterSlave");
429 accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
430 curState);
431 }
432 }