View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.Map;
25  
26  import org.apache.helix.TestHelper;
27  import org.apache.helix.ZNRecord;
28  import org.apache.helix.ZkTestHelper;
29  import org.apache.helix.PropertyKey.Builder;
30  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
31  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
32  import org.apache.helix.mock.controller.ClusterController;
33  import org.apache.helix.mock.participant.MockParticipant;
34  import org.apache.helix.model.IdealState;
35  import org.apache.helix.model.IdealState.IdealStateModeProperty;
36  import org.apache.helix.tools.ClusterSetup;
37  import org.apache.helix.tools.ClusterStateVerifier;
38  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
39  import org.testng.Assert;
40  import org.testng.annotations.Test;
41  
42  
43  
44  public class TestDisable extends ZkIntegrationTestBase
45  {
46  
47    @Test
48    public void testDisableNodeCustomIS() throws Exception
49    {
50      // Logger.getRootLogger().setLevel(Level.INFO);
51      String className = TestHelper.getTestClassName();
52      String methodName = TestHelper.getTestMethodName();
53      String clusterName = className + "_" + methodName;
54      final int n = 5;
55      String disableNode = "localhost_12918";
56  
57      System.out.println("START " + clusterName + " at "
58          + new Date(System.currentTimeMillis()));
59  
60      MockParticipant[] participants = new MockParticipant[n];
61  
62      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
63                              "localhost", // participant name prefix
64                              "TestDB", // resource name prefix
65                              1, // resources
66                              8, // partitions per resource
67                              n, // number of nodes
68                              3, // replicas
69                              "MasterSlave",
70                              true); // do rebalance
71  
72      // set ideal state to customized mode
73      ZkBaseDataAccessor<ZNRecord> baseAccessor =
74          new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
75      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
76      Builder keyBuilder = accessor.keyBuilder();
77      IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
78      idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
79      accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
80  
81      
82      // start controller
83      ClusterController controller =
84          new ClusterController(clusterName, "controller_0", ZK_ADDR);
85      controller.syncStart();
86      
87      // start participants
88      for (int i = 0; i < n; i++)
89      {
90        String instanceName = "localhost_" + (12918 + i);
91  
92        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
93        participants[i].syncStart();
94      }
95  
96      boolean result =
97          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
98                                                                                   clusterName));
99      Assert.assertTrue(result);
100 
101     // disable localhost_12918
102     String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
103         " " + disableNode + " false";
104     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
105     result =
106         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
107                                                                                  clusterName));
108     Assert.assertTrue(result);
109     
110     // make sure localhost_12918 is in OFFLINE state
111     Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
112     Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
113     expectInstanceStateMap.put(disableNode, "OFFLINE");
114     expectStateMap.put(".*", expectInstanceStateMap);
115     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
116     Assert.assertTrue(result, disableNode + " should be in OFFLINE");
117     
118     // re-enable localhost_12918
119     command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
120         " " + disableNode + " true";
121     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
122     result =
123         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
124                                                                                  clusterName));
125     Assert.assertTrue(result);
126 
127     // make sure localhost_12918 is NOT in OFFLINE state
128     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
129     Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
130     
131     // clean up
132     // wait for all zk callbacks done
133     Thread.sleep(1000);
134     controller.syncStop();
135     for (int i = 0; i < 5; i++)
136     {
137       participants[i].syncStop();
138     }
139 
140     System.out.println("END " + clusterName + " at "
141         + new Date(System.currentTimeMillis())); 
142   }
143   
144   @Test
145   public void testDisableNodeAutoIS() throws Exception
146   {
147     // Logger.getRootLogger().setLevel(Level.INFO);
148     String className = TestHelper.getTestClassName();
149     String methodName = TestHelper.getTestMethodName();
150     String clusterName = className + "_" + methodName;
151     final int n = 5;
152     String disableNode = "localhost_12919";
153 
154 
155     System.out.println("START " + clusterName + " at "
156         + new Date(System.currentTimeMillis()));
157 
158     MockParticipant[] participants = new MockParticipant[n];
159 
160     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
161                             "localhost", // participant name prefix
162                             "TestDB", // resource name prefix
163                             1, // resources
164                             8, // partitions per resource
165                             n, // number of nodes
166                             3, // replicas
167                             "MasterSlave",
168                             true); // do rebalance
169 
170     // start controller
171     ClusterController controller =
172         new ClusterController(clusterName, "controller_0", ZK_ADDR);
173     controller.syncStart();
174     
175     // start participants
176     for (int i = 0; i < n; i++)
177     {
178       String instanceName = "localhost_" + (12918 + i);
179 
180       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
181       participants[i].syncStart();
182     }
183 
184     boolean result =
185         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
186                                                                                  clusterName));
187     Assert.assertTrue(result);
188 
189     // disable localhost_12919
190     String command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
191         " " + disableNode + " false";
192     ClusterSetup.processCommandLineArgs(command.split(" "));
193     result =
194         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
195                                                                                  clusterName));
196     Assert.assertTrue(result);
197     
198     // make sure localhost_12919 is in OFFLINE state
199     Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
200     Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
201     expectInstanceStateMap.put(disableNode, "OFFLINE");
202     expectStateMap.put(".*", expectInstanceStateMap);
203     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
204     Assert.assertTrue(result, disableNode + " should be in OFFLINE");
205     
206     // re-enable localhost_12919
207     command = "--zkSvr " + ZK_ADDR +" --enableInstance " + clusterName + 
208         " " + disableNode + " true";
209     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
210     result =
211         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
212                                                                                  clusterName));
213     Assert.assertTrue(result);
214 
215     // make sure localhost_12919 is NOT in OFFLINE state
216     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
217     Assert.assertTrue(result, disableNode + " should NOT be in OFFLINE");
218     
219     // clean up
220     // wait for all zk callbacks done
221     Thread.sleep(1000);
222     controller.syncStop();
223     for (int i = 0; i < 5; i++)
224     {
225       participants[i].syncStop();
226     }
227 
228     System.out.println("END " + clusterName + " at "
229         + new Date(System.currentTimeMillis())); 
230   }
231   
232   @Test
233   public void testDisablePartitionCustomIS() throws Exception
234   {
235     // Logger.getRootLogger().setLevel(Level.INFO);
236     String className = TestHelper.getTestClassName();
237     String methodName = TestHelper.getTestMethodName();
238     String clusterName = className + "_" + methodName;
239     final int n = 5;
240 
241     System.out.println("START " + clusterName + " at "
242         + new Date(System.currentTimeMillis()));
243 
244     MockParticipant[] participants = new MockParticipant[n];
245 
246     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
247                             "localhost", // participant name prefix
248                             "TestDB", // resource name prefix
249                             1, // resources
250                             8, // partitions per resource
251                             n, // number of nodes
252                             3, // replicas
253                             "MasterSlave",
254                             true); // do rebalance
255 
256     // set ideal state to customized mode
257     ZkBaseDataAccessor<ZNRecord> baseAccessor =
258         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
259     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
260     Builder keyBuilder = accessor.keyBuilder();
261     IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB0"));
262     idealState.setIdealStateMode(IdealStateModeProperty.CUSTOMIZED.toString());
263     accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
264 
265     
266     // start controller
267     ClusterController controller =
268         new ClusterController(clusterName, "controller_0", ZK_ADDR);
269     controller.syncStart();
270     
271     // start participants
272     for (int i = 0; i < n; i++)
273     {
274       String instanceName = "localhost_" + (12918 + i);
275 
276       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
277       participants[i].syncStart();
278     }
279 
280     boolean result =
281         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
282                                                                                  clusterName));
283     Assert.assertTrue(result);
284 
285     // disable [TestDB0_0, TestDB0_5] on localhost_12919
286     String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
287         " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
288     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
289 
290     result =
291         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
292                                                                                  clusterName));
293     Assert.assertTrue(result);
294     
295     // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
296     Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
297     Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
298     expectInstanceStateMap.put("localhost_12919", "OFFLINE");
299     expectStateMap.put("TestDB0_0", expectInstanceStateMap);
300     expectStateMap.put("TestDB0_5", expectInstanceStateMap);
301     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
302     Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
303 
304 
305     // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
306     command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
307         " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
308     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
309     result =
310         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
311                                                                                  clusterName));
312     Assert.assertTrue(result);
313 
314     // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
315     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
316     Assert.assertTrue(result,  "localhost_12919" + " should NOT be in OFFLINE");
317 
318     
319     // clean up
320     // wait for all zk callbacks done
321     Thread.sleep(1000);
322     controller.syncStop();
323     for (int i = 0; i < 5; i++)
324     {
325       participants[i].syncStop();
326     }
327 
328     System.out.println("END " + clusterName + " at "
329         + new Date(System.currentTimeMillis())); 
330   }
331   
332   @Test
333   public void testDisablePartitionAutoIS() throws Exception
334   {
335     // Logger.getRootLogger().setLevel(Level.INFO);
336     String className = TestHelper.getTestClassName();
337     String methodName = TestHelper.getTestMethodName();
338     String clusterName = className + "_" + methodName;
339     final int n = 5;
340 
341     System.out.println("START " + clusterName + " at "
342         + new Date(System.currentTimeMillis()));
343 
344     MockParticipant[] participants = new MockParticipant[n];
345 
346     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
347                             "localhost", // participant name prefix
348                             "TestDB", // resource name prefix
349                             1, // resources
350                             8, // partitions per resource
351                             n, // number of nodes
352                             3, // replicas
353                             "MasterSlave",
354                             true); // do rebalance
355 
356     // start controller
357     ClusterController controller =
358         new ClusterController(clusterName, "controller_0", ZK_ADDR);
359     controller.syncStart();
360     
361     // start participants
362     for (int i = 0; i < n; i++)
363     {
364       String instanceName = "localhost_" + (12918 + i);
365 
366       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
367       participants[i].syncStart();
368     }
369 
370     boolean result =
371         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
372                                                                                  clusterName));
373     Assert.assertTrue(result);
374 
375     // disable [TestDB0_0, TestDB0_5] on localhost_12919
376     String command = "--zkSvr " + ZK_ADDR +" --enablePartition false " + clusterName +
377         " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
378     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
379 
380     result =
381         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
382                                                                                  clusterName));
383     Assert.assertTrue(result);
384     
385     // make sure localhost_12918 is in OFFLINE state for [TestDB0_0, TestDB0_5]
386     Map<String, Map<String, String>> expectStateMap = new HashMap<String, Map<String, String>>();
387     Map<String, String> expectInstanceStateMap = new HashMap<String, String>();
388     expectInstanceStateMap.put("localhost_12919", "OFFLINE");
389     expectStateMap.put("TestDB0_0", expectInstanceStateMap);
390     expectStateMap.put("TestDB0_5", expectInstanceStateMap);
391     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "==");
392     Assert.assertTrue(result, "localhost_12919" + " should be in OFFLINE for [TestDB0_0, TestDB0_5]");
393 
394 
395     // re-enable localhost_12919 for [TestDB0_0, TestDB0_5]
396     command = "--zkSvr " + ZK_ADDR +" --enablePartition true " + clusterName +
397         " localhost_12919 TestDB0 TestDB0_0 TestDB0_5";
398     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
399     result =
400         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
401                                                                                  clusterName));
402     Assert.assertTrue(result);
403 
404     // make sure localhost_12919 is NOT in OFFLINE state for [TestDB0_0, TestDB0_5]
405     result = ZkTestHelper.verifyState(_gZkClient, clusterName, "TestDB0", expectStateMap, "!=");
406     Assert.assertTrue(result,  "localhost_12919" + " should NOT be in OFFLINE");
407 
408     
409     // clean up
410     // wait for all zk callbacks done
411     Thread.sleep(1000);
412     controller.syncStop();
413     for (int i = 0; i < 5; i++)
414     {
415       participants[i].syncStop();
416     }
417 
418     System.out.println("END " + clusterName + " at "
419         + new Date(System.currentTimeMillis())); 
420   }
421 
422 }