View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.List;
24  
25  import org.apache.helix.HelixAdmin;
26  import org.apache.helix.HelixDataAccessor;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.TestHelper;
29  import org.apache.helix.ZNRecord;
30  import org.apache.helix.ZkUnitTestBase;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.controller.HelixControllerMain;
33  import org.apache.helix.controller.pipeline.Pipeline;
34  import org.apache.helix.controller.stages.AttributeName;
35  import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
36  import org.apache.helix.controller.stages.ClusterEvent;
37  import org.apache.helix.controller.stages.CurrentStateComputationStage;
38  import org.apache.helix.controller.stages.MessageGenerationPhase;
39  import org.apache.helix.controller.stages.MessageSelectionStage;
40  import org.apache.helix.controller.stages.MessageSelectionStageOutput;
41  import org.apache.helix.controller.stages.MessageThrottleStage;
42  import org.apache.helix.controller.stages.ReadClusterDataStage;
43  import org.apache.helix.controller.stages.ResourceComputationStage;
44  import org.apache.helix.controller.stages.TaskAssignmentStage;
45  import org.apache.helix.manager.zk.ZKHelixAdmin;
46  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
47  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
48  import org.apache.helix.model.CurrentState;
49  import org.apache.helix.model.Message;
50  import org.apache.helix.model.Partition;
51  import org.apache.helix.model.Message.Attributes;
52  import org.apache.log4j.Logger;
53  import org.testng.Assert;
54  import org.testng.annotations.Test;
55  
56  
57  public class TestRebalancePipeline extends ZkUnitTestBase
58  {
59    private static final Logger LOG        =
60                                               Logger.getLogger(TestRebalancePipeline.class.getName());
61    final String                _className = getShortClassName();
62  
63    @Test
64    public void testDuplicateMsg()
65    {
66      String clusterName = "CLUSTER_" + _className + "_dup";
67      System.out.println("START " + clusterName + " at "
68          + new Date(System.currentTimeMillis()));
69  
70      HelixDataAccessor accessor =
71          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
72  
73      HelixManager manager = new DummyClusterManager(clusterName, accessor);
74      ClusterEvent event = new ClusterEvent("testEvent");
75      event.addAttribute("helixmanager", manager);
76  
77      final String resourceName = "testResource_dup";
78      String[] resourceGroups = new String[] { resourceName };
79      // ideal state: node0 is MASTER, node1 is SLAVE
80      // replica=2 means 1 master and 1 slave
81      setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
82      setupLiveInstances(clusterName, new int[] { 0, 1 });
83      setupStateModel(clusterName);
84  
85      // cluster data cache refresh pipeline
86      Pipeline dataRefresh = new Pipeline();
87      dataRefresh.addStage(new ReadClusterDataStage());
88  
89      // rebalance pipeline
90      Pipeline rebalancePipeline = new Pipeline();
91      rebalancePipeline.addStage(new ResourceComputationStage());
92      rebalancePipeline.addStage(new CurrentStateComputationStage());
93      rebalancePipeline.addStage(new BestPossibleStateCalcStage());
94      rebalancePipeline.addStage(new MessageGenerationPhase());
95      rebalancePipeline.addStage(new MessageSelectionStage());
96      rebalancePipeline.addStage(new MessageThrottleStage());
97      rebalancePipeline.addStage(new TaskAssignmentStage());
98  
99      // round1: set node0 currentState to OFFLINE and node1 currentState to OFFLINE
100     setCurrentState(clusterName,
101                     "localhost_0",
102                     resourceName,
103                     resourceName + "_0",
104                     "session_0",
105                     "OFFLINE");
106     setCurrentState(clusterName,
107                     "localhost_1",
108                     resourceName,
109                     resourceName + "_0",
110                     "session_1",
111                     "SLAVE");
112 
113     runPipeline(event, dataRefresh);
114     runPipeline(event, rebalancePipeline);
115     MessageSelectionStageOutput msgSelOutput =
116         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
117     List<Message> messages =
118         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
119     Assert.assertEquals(messages.size(),
120                         1,
121                         "Should output 1 message: OFFLINE-SLAVE for node0");
122     Message message = messages.get(0);
123     Assert.assertEquals(message.getFromState(), "OFFLINE");
124     Assert.assertEquals(message.getToState(), "SLAVE");
125     Assert.assertEquals(message.getTgtName(), "localhost_0");
126 
127     // round2: updates node0 currentState to SLAVE but keep the
128     // message, make sure controller should not send S->M until removal is done
129     setCurrentState(clusterName,
130                     "localhost_0",
131                     resourceName,
132                     resourceName + "_0",
133                     "session_1",
134                     "SLAVE");
135 
136     runPipeline(event, dataRefresh);
137     runPipeline(event, rebalancePipeline);
138     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
139     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
140     Assert.assertEquals(messages.size(),
141                         0,
142                         "Should NOT output 1 message: SLAVE-MASTER for node1");
143 
144     System.out.println("END " + clusterName + " at "
145         + new Date(System.currentTimeMillis()));
146 
147   }
148 
149   @Test
150   public void testMsgTriggeredRebalance() throws Exception
151   {
152     String clusterName = "CLUSTER_" + _className + "_msgTrigger";
153     System.out.println("START " + clusterName + " at "
154         + new Date(System.currentTimeMillis()));
155 
156     HelixDataAccessor accessor =
157         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
158     HelixManager manager = new DummyClusterManager(clusterName, accessor);
159     ClusterEvent event = new ClusterEvent("testEvent");
160 
161     final String resourceName = "testResource_dup";
162     String[] resourceGroups = new String[] { resourceName };
163 
164     TestHelper.setupEmptyCluster(_gZkClient, clusterName);
165 
166     // ideal state: node0 is MASTER, node1 is SLAVE
167     // replica=2 means 1 master and 1 slave
168     setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
169     setupStateModel(clusterName);
170     setupInstances(clusterName, new int[] { 0, 1 });
171     setupLiveInstances(clusterName, new int[] { 0, 1 });
172 
173     TestHelper.startController(clusterName,
174                                "controller_0",
175                                ZK_ADDR,
176                                HelixControllerMain.STANDALONE);
177 
178     // round1: controller sends O->S to both node0 and node1
179     Thread.sleep(1000);
180 
181     Builder keyBuilder = accessor.keyBuilder();
182     List<String> messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
183     Assert.assertEquals(messages.size(), 1);
184     messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
185     Assert.assertEquals(messages.size(), 1);
186 
187     // round2: node0 and node1 update current states but not removing messages
188     // controller's rebalance pipeline should be triggered but since messages are not
189     // removed
190     // no new messages will be sent
191     setCurrentState(clusterName,
192                     "localhost_0",
193                     resourceName,
194                     resourceName + "_0",
195                     "session_0",
196                     "SLAVE");
197     setCurrentState(clusterName,
198                     "localhost_1",
199                     resourceName,
200                     resourceName + "_0",
201                     "session_1",
202                     "SLAVE");
203     Thread.sleep(1000);
204     messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
205     Assert.assertEquals(messages.size(), 1);
206 
207     messages = accessor.getChildNames(keyBuilder.messages("localhost_1"));
208     Assert.assertEquals(messages.size(), 1);
209 
210     // round3: node0 removes message and controller's rebalance pipeline should be
211     // triggered
212     // and sends S->M to node0
213     messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
214     accessor.removeProperty(keyBuilder.message("localhost_0", messages.get(0)));
215     Thread.sleep(1000);
216 
217     messages = accessor.getChildNames(keyBuilder.messages("localhost_0"));
218     Assert.assertEquals(messages.size(), 1);
219     ZNRecord msg =
220         accessor.getProperty(keyBuilder.message("localhost_0", messages.get(0)))
221                 .getRecord();
222     String toState = msg.getSimpleField(Attributes.TO_STATE.toString());
223     Assert.assertEquals(toState, "MASTER");
224 
225     System.out.println("END " + clusterName + " at "
226         + new Date(System.currentTimeMillis()));
227 
228   }
229 
230   @Test
231   public void testChangeIdealStateWithPendingMsg()
232   {
233     String clusterName = "CLUSTER_" + _className + "_pending";
234     System.out.println("START " + clusterName + " at "
235         + new Date(System.currentTimeMillis()));
236 
237     HelixDataAccessor accessor =
238         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
239     HelixManager manager = new DummyClusterManager(clusterName, accessor);
240     ClusterEvent event = new ClusterEvent("testEvent");
241     event.addAttribute("helixmanager", manager);
242 
243     final String resourceName = "testResource_pending";
244     String[] resourceGroups = new String[] { resourceName };
245     // ideal state: node0 is MASTER, node1 is SLAVE
246     // replica=2 means 1 master and 1 slave
247     setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
248     setupLiveInstances(clusterName, new int[] { 0, 1 });
249     setupStateModel(clusterName);
250 
251     // cluster data cache refresh pipeline
252     Pipeline dataRefresh = new Pipeline();
253     dataRefresh.addStage(new ReadClusterDataStage());
254 
255     // rebalance pipeline
256     Pipeline rebalancePipeline = new Pipeline();
257     rebalancePipeline.addStage(new ResourceComputationStage());
258     rebalancePipeline.addStage(new CurrentStateComputationStage());
259     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
260     rebalancePipeline.addStage(new MessageGenerationPhase());
261     rebalancePipeline.addStage(new MessageSelectionStage());
262     rebalancePipeline.addStage(new MessageThrottleStage());
263     rebalancePipeline.addStage(new TaskAssignmentStage());
264 
265     // round1: set node0 currentState to OFFLINE and node1 currentState to SLAVE
266     setCurrentState(clusterName,
267                     "localhost_0",
268                     resourceName,
269                     resourceName + "_0",
270                     "session_0",
271                     "OFFLINE");
272     setCurrentState(clusterName,
273                     "localhost_1",
274                     resourceName,
275                     resourceName + "_0",
276                     "session_1",
277                     "SLAVE");
278 
279     runPipeline(event, dataRefresh);
280     runPipeline(event, rebalancePipeline);
281     MessageSelectionStageOutput msgSelOutput =
282         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
283     List<Message> messages =
284         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
285     Assert.assertEquals(messages.size(),
286                         1,
287                         "Should output 1 message: OFFLINE-SLAVE for node0");
288     Message message = messages.get(0);
289     Assert.assertEquals(message.getFromState(), "OFFLINE");
290     Assert.assertEquals(message.getToState(), "SLAVE");
291     Assert.assertEquals(message.getTgtName(), "localhost_0");
292 
293     // round2: drop resource, but keep the
294     // message, make sure controller should not send O->DROPPEDN until O->S is done
295     HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
296     admin.dropResource(clusterName, resourceName);
297 
298     runPipeline(event, dataRefresh);
299     runPipeline(event, rebalancePipeline);
300     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
301     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
302     Assert.assertEquals(messages.size(),
303                         1,
304                         "Should output only 1 message: OFFLINE->DROPPED for localhost_1");
305 
306     message = messages.get(0);
307     Assert.assertEquals(message.getFromState(), "SLAVE");
308     Assert.assertEquals(message.getToState(), "OFFLINE");
309     Assert.assertEquals(message.getTgtName(), "localhost_1");
310 
311     // round3: remove O->S for localhost_0, controller should now send O->DROPPED to
312     // localhost_0
313     Builder keyBuilder = accessor.keyBuilder();
314     List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
315     accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
316     runPipeline(event, dataRefresh);
317     runPipeline(event, rebalancePipeline);
318     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
319     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
320     Assert.assertEquals(messages.size(),
321                         1,
322                         "Should output 1 message: OFFLINE->DROPPED for localhost_0");
323     message = messages.get(0);
324     Assert.assertEquals(message.getFromState(), "OFFLINE");
325     Assert.assertEquals(message.getToState(), "DROPPED");
326     Assert.assertEquals(message.getTgtName(), "localhost_0");
327 
328     System.out.println("END " + clusterName + " at "
329         + new Date(System.currentTimeMillis()));
330 
331   }
332 
333   @Test
334   public void testMasterXfer()
335   {
336     String clusterName = "CLUSTER_" + _className + "_xfer";
337 
338     System.out.println("START " + clusterName + " at "
339         + new Date(System.currentTimeMillis()));
340 
341     HelixDataAccessor accessor =
342         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
343     HelixManager manager = new DummyClusterManager(clusterName, accessor);
344     ClusterEvent event = new ClusterEvent("testEvent");
345     event.addAttribute("helixmanager", manager);
346 
347     final String resourceName = "testResource_xfer";
348     String[] resourceGroups = new String[] { resourceName };
349     // ideal state: node0 is MASTER, node1 is SLAVE
350     // replica=2 means 1 master and 1 slave
351     setupIdealState(clusterName, new int[] { 0, 1 }, resourceGroups, 1, 2);
352     setupLiveInstances(clusterName, new int[] { 1 });
353     setupStateModel(clusterName);
354 
355     // cluster data cache refresh pipeline
356     Pipeline dataRefresh = new Pipeline();
357     dataRefresh.addStage(new ReadClusterDataStage());
358 
359     // rebalance pipeline
360     Pipeline rebalancePipeline = new Pipeline();
361     rebalancePipeline.addStage(new ResourceComputationStage());
362     rebalancePipeline.addStage(new CurrentStateComputationStage());
363     rebalancePipeline.addStage(new BestPossibleStateCalcStage());
364     rebalancePipeline.addStage(new MessageGenerationPhase());
365     rebalancePipeline.addStage(new MessageSelectionStage());
366     rebalancePipeline.addStage(new MessageThrottleStage());
367     rebalancePipeline.addStage(new TaskAssignmentStage());
368 
369     // round1: set node1 currentState to SLAVE
370     setCurrentState(clusterName,
371                     "localhost_1",
372                     resourceName,
373                     resourceName + "_0",
374                     "session_1",
375                     "SLAVE");
376 
377     runPipeline(event, dataRefresh);
378     runPipeline(event, rebalancePipeline);
379     MessageSelectionStageOutput msgSelOutput =
380         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
381     List<Message> messages =
382         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
383     Assert.assertEquals(messages.size(),
384                         1,
385                         "Should output 1 message: SLAVE-MASTER for node1");
386     Message message = messages.get(0);
387     Assert.assertEquals(message.getFromState(), "SLAVE");
388     Assert.assertEquals(message.getToState(), "MASTER");
389     Assert.assertEquals(message.getTgtName(), "localhost_1");
390 
391     // round2: updates node0 currentState to SLAVE but keep the
392     // message, make sure controller should not send S->M until removal is done
393     setupLiveInstances(clusterName, new int[] { 0 });
394     setCurrentState(clusterName,
395                     "localhost_0",
396                     resourceName,
397                     resourceName + "_0",
398                     "session_0",
399                     "SLAVE");
400 
401     runPipeline(event, dataRefresh);
402     runPipeline(event, rebalancePipeline);
403     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
404     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
405     Assert.assertEquals(messages.size(),
406                         0,
407                         "Should NOT output 1 message: SLAVE-MASTER for node0");
408 
409     System.out.println("END " + clusterName + " at "
410         + new Date(System.currentTimeMillis()));
411 
412   }
413 
414   protected void setCurrentState(String clusterName,
415                                  String instance,
416                                  String resourceGroupName,
417                                  String resourceKey,
418                                  String sessionId,
419                                  String state)
420   {
421     ZKHelixDataAccessor accessor =
422         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
423     Builder keyBuilder = accessor.keyBuilder();
424 
425     CurrentState curState = new CurrentState(resourceGroupName);
426     curState.setState(resourceKey, state);
427     curState.setSessionId(sessionId);
428     curState.setStateModelDefRef("MasterSlave");
429     accessor.setProperty(keyBuilder.currentState(instance, sessionId, resourceGroupName),
430                          curState);
431   }
432 }