View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Arrays;
23  import java.util.Date;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  
28  import org.apache.helix.TestHelper;
29  import org.apache.helix.controller.HelixControllerMain;
30  import org.apache.helix.manager.zk.ZKHelixAdmin;
31  import org.apache.helix.mock.participant.MockParticipant;
32  import org.apache.helix.mock.participant.ErrTransition;
33  import org.apache.helix.tools.ClusterStateVerifier;
34  import org.testng.Assert;
35  import org.testng.annotations.Test;
36  
37  
38  public class TestErrorPartition extends ZkIntegrationTestBase
39  {
40    @Test()
41    public void testErrorPartition() throws Exception
42    {
43      String clusterName = getShortClassName();
44      MockParticipant[] participants = new MockParticipant[5];
45  
46      System.out.println("START testErrorPartition() at "
47          + new Date(System.currentTimeMillis()));
48      ZKHelixAdmin tool = new ZKHelixAdmin(_gZkClient);
49  
50      TestHelper.setupCluster(clusterName,
51                              ZK_ADDR,
52                              12918,
53                              "localhost",
54                              "TestDB",
55                              1,
56                              10,
57                              5,
58                              3,
59                              "MasterSlave",
60                              true);
61  
62      TestHelper.startController(clusterName,
63                                 "controller_0",
64                                 ZK_ADDR,
65                                 HelixControllerMain.STANDALONE);
66      for (int i = 0; i < 5; i++)
67      {
68        String instanceName = "localhost_" + (12918 + i);
69  
70        if (i == 0)
71        {
72          Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>()
73          {
74            {
75              put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
76            }
77          };
78          participants[i] =
79              new MockParticipant(clusterName,
80                                  instanceName,
81                                  ZK_ADDR,
82                                  new ErrTransition(errPartitions));
83        }
84        else
85        {
86          participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
87        }
88        participants[i].syncStart();
89        // new Thread(participants[i]).start();
90      }
91  
92      Map<String, Map<String, String>> errStates =
93          new HashMap<String, Map<String, String>>();
94      errStates.put("TestDB0", new HashMap<String, String>());
95      errStates.get("TestDB0").put("TestDB0_4", "localhost_12918");
96      boolean result =
97          ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
98                                                                                            clusterName,
99                                                                                            errStates));
100     Assert.assertTrue(result);
101 
102     Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>()
103     {
104       {
105         put("TestDB0_4", TestHelper.setOf("localhost_12918"));
106       }
107     };
108 
109     // verify "TestDB0_0", "localhost_12918" is in ERROR state
110     TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
111 
112     // disable a partition on a node with error state
113     tool.enablePartition(false, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
114 
115     result =
116         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
117                                                                                           clusterName,
118                                                                                           errStates));
119     Assert.assertTrue(result);
120 
121     TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR");
122 
123     // disable a node with error state
124     tool.enableInstance(clusterName, "localhost_12918", false);
125 
126     result =
127         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
128                                                                                           clusterName,
129                                                                                           errStates));
130     Assert.assertTrue(result);
131 
132     // make sure after restart stale ERROR state is gone
133     tool.enablePartition(true, clusterName, "localhost_12918", "TestDB0", Arrays.asList("TestDB0_4"));
134     tool.enableInstance(clusterName, "localhost_12918", true);
135 
136     participants[0].syncStop();
137     result =
138         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
139                                                                                           clusterName));
140     Assert.assertTrue(result);
141     participants[0] = new MockParticipant(clusterName, "localhost_12918", ZK_ADDR);
142     new Thread(participants[0]).start();
143 
144     result =
145         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
146                                                                                           clusterName));
147     Assert.assertTrue(result);
148 
149     System.out.println("END testErrorPartition() at "
150         + new Date(System.currentTimeMillis()));
151   }
152 }