View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  
28  import org.I0Itec.zkclient.DataUpdater;
29  import org.I0Itec.zkclient.IZkChildListener;
30  import org.I0Itec.zkclient.IZkDataListener;
31  import org.apache.helix.BaseDataAccessor;
32  import org.apache.helix.HelixDataAccessor;
33  import org.apache.helix.TestHelper;
34  import org.apache.helix.ZNRecord;
35  import org.apache.helix.PropertyKey.Builder;
36  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
37  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
38  import org.apache.helix.mock.controller.ClusterController;
39  import org.apache.helix.mock.participant.MockParticipant;
40  import org.apache.helix.mock.participant.ErrTransition;
41  import org.apache.helix.model.ExternalView;
42  import org.apache.helix.model.InstanceConfig;
43  import org.apache.helix.model.builder.CustomModeISBuilder;
44  import org.apache.helix.model.builder.IdealStateBuilder;
45  import org.apache.helix.tools.ClusterSetup;
46  import org.apache.helix.tools.ClusterStateVerifier;
47  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
48  import org.apache.zookeeper.data.Stat;
49  import org.testng.Assert;
50  import org.testng.annotations.Test;
51  
52  
53  public class TestDrop extends ZkIntegrationTestBase
54  {
55    @Test
56    public void testDropErrorPartitionAutoIS() throws Exception
57    {
58      // Logger.getRootLogger().setLevel(Level.INFO);
59      String className = TestHelper.getTestClassName();
60      String methodName = TestHelper.getTestMethodName();
61      String clusterName = className + "_" + methodName;
62      final int n = 5;
63  
64      System.out.println("START " + clusterName + " at "
65          + new Date(System.currentTimeMillis()));
66  
67      MockParticipant[] participants = new MockParticipant[n];
68  
69      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
70                              "localhost", // participant name prefix
71                              "TestDB", // resource name prefix
72                              1, // resources
73                              10, // partitions per resource
74                              n, // number of nodes
75                              3, // replicas
76                              "MasterSlave",
77                              true); // do rebalance
78  
79      // start controller
80      ClusterController controller =
81          new ClusterController(clusterName, "controller_0", ZK_ADDR);
82      controller.syncStart();
83      
84      // start participants
85      Map<String, Set<String>> errTransitions = new HashMap<String, Set<String>>();
86      errTransitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
87      errTransitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
88  
89      for (int i = 0; i < n; i++)
90      {
91        String instanceName = "localhost_" + (12918 + i);
92  
93        if (i == 0)
94        {
95          participants[i] =
96              new MockParticipant(clusterName,
97                                  instanceName,
98                                  ZK_ADDR,
99                                  new ErrTransition(errTransitions));
100       }
101       else
102       {
103         participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
104       }
105       participants[i].syncStart();
106     }
107 
108     Map<String, Map<String, String>> errStateMap =
109         new HashMap<String, Map<String, String>>();
110     errStateMap.put("TestDB0", new HashMap<String, String>());
111     errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
112     errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
113     boolean result =
114         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
115                                                                                  clusterName,
116                                                                                  errStateMap));
117     Assert.assertTrue(result);
118     
119     // drop resource containing error partitions should drop the partition successfully
120     ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZK_ADDR, "--dropResource", clusterName, "TestDB0"});
121 
122 
123     // make sure TestDB0_4 and TestDB0_8 partitions are dropped
124     result =
125         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
126                                                                                  clusterName));
127     Assert.assertTrue(result);
128     
129     
130     // clean up
131     // wait for all zk callbacks done
132 //    Thread.sleep(1000);
133 //    controller.syncStop();
134 //    for (int i = 0; i < 5; i++)
135 //    {
136 //      participants[i].syncStop();
137 //    }
138 
139     System.out.println("END " + clusterName + " at "
140         + new Date(System.currentTimeMillis())); 
141   }
142   
143   @Test
144   public void testDropErrorPartitionFailedAutoIS() throws Exception
145   {
146     // Logger.getRootLogger().setLevel(Level.INFO);
147     String className = TestHelper.getTestClassName();
148     String methodName = TestHelper.getTestMethodName();
149     String clusterName = className + "_" + methodName;
150     final int n = 5;
151 
152     System.out.println("START " + clusterName + " at "
153         + new Date(System.currentTimeMillis()));
154 
155     MockParticipant[] participants = new MockParticipant[n];
156 
157     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
158                             "localhost", // participant name prefix
159                             "TestDB", // resource name prefix
160                             1, // resources
161                             8, // partitions per resource
162                             n, // number of nodes
163                             3, // replicas
164                             "MasterSlave",
165                             true); // do rebalance
166 
167     // start controller
168     ClusterController controller =
169         new ClusterController(clusterName, "controller_0", ZK_ADDR);
170     controller.syncStart();
171     
172     // start participants
173     Map<String, Set<String>> errTransitions = new HashMap<String, Set<String>>();
174     errTransitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
175     errTransitions.put("ERROR-DROPPED", TestHelper.setOf("TestDB0_4"));
176 
177     for (int i = 0; i < n; i++)
178     {
179       String instanceName = "localhost_" + (12918 + i);
180 
181       if (i == 0)
182       {
183         participants[i] =
184             new MockParticipant(clusterName,
185                                 instanceName,
186                                 ZK_ADDR,
187                                 new ErrTransition(errTransitions));
188       }
189       else
190       {
191         participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
192       }
193       participants[i].syncStart();
194     }
195 
196     Map<String, Map<String, String>> errStateMap =
197         new HashMap<String, Map<String, String>>();
198     errStateMap.put("TestDB0", new HashMap<String, String>());
199     errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
200     boolean result =
201         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
202                                                                                  clusterName,
203                                                                                  errStateMap));
204     Assert.assertTrue(result);
205     
206     // drop resource containing error partitions should invoke error->dropped transition
207     // if error happens during error->dropped transition, partition should be disabled
208     ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZK_ADDR, "--dropResource", clusterName, "TestDB0"});
209 
210 
211     // make sure TestDB0_4 stay in ERROR state and is disabled
212     result =
213         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
214                                                                                  clusterName,
215                                                                                  errStateMap));
216     Assert.assertTrue(result);
217     
218     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
219     Builder keyBuilder = accessor.keyBuilder();
220     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig("localhost_12918"));
221     List<String> disabledPartitions = config.getDisabledPartitions();
222     // System.out.println("disabledPartitions: " + disabledPartitions);
223     Assert.assertEquals(disabledPartitions.size(), 1, "TestDB0_4 should be disabled");
224     Assert.assertEquals(disabledPartitions.get(0), "TestDB0_4");
225     
226     // clean up
227     // wait for all zk callbacks done
228 //    Thread.sleep(1000);
229 //    controller.syncStop();
230 //    for (int i = 0; i < 5; i++)
231 //    {
232 //      participants[i].syncStop();
233 //    }
234 
235     System.out.println("END " + clusterName + " at "
236         + new Date(System.currentTimeMillis())); 
237   }
238   
239   @Test
240   public void testDropErrorPartitionCustomIS() throws Exception
241   {
242     // Logger.getRootLogger().setLevel(Level.INFO);
243     String className = TestHelper.getTestClassName();
244     String methodName = TestHelper.getTestMethodName();
245     String clusterName = className + "_" + methodName;
246     final int n = 2;
247 
248     System.out.println("START " + clusterName + " at "
249         + new Date(System.currentTimeMillis()));
250 
251     MockParticipant[] participants = new MockParticipant[n];
252 
253     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
254                             "localhost", // participant name prefix
255                             "TestDB", // resource name prefix
256                             1, // resources
257                             2, // partitions per resource
258                             n, // number of nodes
259                             2, // replicas
260                             "MasterSlave",
261                             false); // do rebalance
262 
263     // set custom ideal-state
264     CustomModeISBuilder isBuilder = new CustomModeISBuilder("TestDB0");
265     isBuilder.setNumPartitions(2);
266     isBuilder.setNumReplica(2);
267     isBuilder.setStateModel("MasterSlave");
268     isBuilder.assignInstanceAndState("TestDB0_0", "localhost_12918", "MASTER");
269     isBuilder.assignInstanceAndState("TestDB0_0", "localhost_12919", "SLAVE");
270     isBuilder.assignInstanceAndState("TestDB0_1", "localhost_12919", "MASTER");
271     isBuilder.assignInstanceAndState("TestDB0_1", "localhost_12918", "SLAVE");
272     
273     HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
274         new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
275     Builder keyBuiler = accessor.keyBuilder();
276     accessor.setProperty(keyBuiler.idealStates("TestDB0"), isBuilder.build());
277     
278     // start controller
279     ClusterController controller =
280         new ClusterController(clusterName, "controller_0", ZK_ADDR);
281     controller.syncStart();
282     
283     // start participants
284     Map<String, Set<String>> errTransitions = new HashMap<String, Set<String>>();
285     errTransitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_0"));
286 
287     for (int i = 0; i < n; i++)
288     {
289       String instanceName = "localhost_" + (12918 + i);
290 
291       if (i == 0)
292       {
293         participants[i] =
294             new MockParticipant(clusterName,
295                                 instanceName,
296                                 ZK_ADDR,
297                                 new ErrTransition(errTransitions));
298       }
299       else
300       {
301         participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
302       }
303       participants[i].syncStart();
304     }
305 
306     Map<String, Map<String, String>> errStateMap =
307         new HashMap<String, Map<String, String>>();
308     errStateMap.put("TestDB0", new HashMap<String, String>());
309     errStateMap.get("TestDB0").put("TestDB0_0", "localhost_12918");
310     boolean result =
311         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
312                                                                                  clusterName,
313                                                                                  errStateMap));
314     Assert.assertTrue(result);
315     
316     // drop resource containing error partitions should drop the partition successfully
317     ClusterSetup.processCommandLineArgs(new String[]{"--zkSvr", ZK_ADDR, "--dropResource", clusterName, "TestDB0"});
318 
319 
320     // make sure TestDB0_0 partition is dropped
321     result =
322         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
323                                                                                  clusterName));
324     Assert.assertTrue(result, "Should be empty exeternal-view");
325     
326     System.out.println("END " + clusterName + " at "
327         + new Date(System.currentTimeMillis())); 
328   }
329   
330   @Test
331   public void testDropSchemataResource() throws Exception
332   {
333     // Logger.getRootLogger().setLevel(Level.INFO);
334     String className = TestHelper.getTestClassName();
335     String methodName = TestHelper.getTestMethodName();
336     String clusterName = className + "_" + methodName;
337     final int n = 5;
338 
339     System.out.println("START " + clusterName + " at "
340         + new Date(System.currentTimeMillis()));
341 
342     MockParticipant[] participants = new MockParticipant[n];
343 
344     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
345                             "localhost", // participant name prefix
346                             "TestDB", // resource name prefix
347                             1, // resources
348                             8, // partitions per resource
349                             n, // number of nodes
350                             3, // replicas
351                             "MasterSlave",
352                             true); // do rebalance
353 
354     // start controller
355     ClusterController controller =
356         new ClusterController(clusterName, "controller_0", ZK_ADDR);
357     controller.syncStart();
358     
359     // start participants
360     for (int i = 0; i < n; i++)
361     {
362       String instanceName = "localhost_" + (12918 + i);
363 
364       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
365       participants[i].syncStart();
366     }
367 
368     boolean result =
369         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
370                                                                                  clusterName));
371     Assert.assertTrue(result);
372 
373     // add schemata resource group
374     String command = "--zkSvr " + ZK_ADDR +" --addResource " + clusterName + 
375         " schemata 1 STORAGE_DEFAULT_SM_SCHEMATA";
376     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
377     command = "--zkSvr " + ZK_ADDR +" --rebalance " + clusterName + 
378         " schemata " + n;
379     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
380     
381     result =
382         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
383                                                                                  clusterName));
384     Assert.assertTrue(result);
385     
386     // drop schemata resource group
387     System.out.println("Dropping schemata resource group...");
388     command = "--zkSvr " + ZK_ADDR +" --dropResource " + clusterName + 
389         " schemata";
390     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
391     result =
392         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
393                                                                                  clusterName));
394     Assert.assertTrue(result);
395 
396     // make sure schemata external view is empty
397     ZkBaseDataAccessor<ZNRecord> baseAccessor =
398         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
399     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
400     Builder keyBuilder = accessor.keyBuilder();
401     ExternalView extView = accessor.getProperty(keyBuilder.externalView("schemata"));
402     Assert.assertEquals(extView.getPartitionSet().size(), 0, "schemata externalView should be empty but was \"" + extView + "\"");
403     
404     // clean up
405     // wait for all zk callbacks done
406     Thread.sleep(1000);
407     controller.syncStop();
408     for (int i = 0; i < 5; i++)
409     {
410       participants[i].syncStop();
411     }
412 
413     System.out.println("END " + clusterName + " at "
414         + new Date(System.currentTimeMillis())); 
415   }
416 }