View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  
24  import org.apache.helix.HelixManager;
25  import org.apache.helix.NotificationContext;
26  import org.apache.helix.TestHelper;
27  import org.apache.helix.mock.controller.ClusterController;
28  import org.apache.helix.mock.participant.MockParticipant;
29  import org.apache.helix.mock.participant.MockTransition;
30  import org.apache.helix.model.Message;
31  import org.apache.helix.tools.ClusterSetup;
32  import org.apache.helix.tools.ClusterStateVerifier;
33  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
34  import org.testng.Assert;
35  import org.testng.annotations.Test;
36  
37  
38  public class TestEnablePartitionDuringDisable extends ZkIntegrationTestBase
39  {
40    static
41    {
42      // Logger.getRootLogger().setLevel(Level.INFO);
43    }
44  
45    class EnablePartitionTransition extends MockTransition
46    {
47      int slaveToOfflineCnt = 0;
48      int offlineToSlave = 0;
49  
50      @Override
51      public void doTransition(Message message, NotificationContext context)
52      {
53        HelixManager manager = context.getManager();
54        String clusterName = manager.getClusterName();
55  
56        String instance = message.getTgtName();
57        String partitionName = message.getPartitionName();
58        String fromState = message.getFromState();
59        String toState = message.getToState();
60        if (instance.equals("localhost_12919") && partitionName.equals("TestDB0_0"))
61        {
62          if (fromState.equals("SLAVE") && toState.equals("OFFLINE"))
63          {
64            slaveToOfflineCnt++;
65  
66            try
67            {
68              String command =
69                  "--zkSvr " + ZK_ADDR + " --enablePartition true " + clusterName
70                      + " localhost_12919 TestDB0 TestDB0_0";
71  
72              ClusterSetup.processCommandLineArgs(command.split("\\s+"));
73            }
74            catch (Exception e)
75            {
76              // TODO Auto-generated catch block
77              e.printStackTrace();
78            }
79  
80          }
81          else if (slaveToOfflineCnt > 0 && fromState.equals("OFFLINE") && toState.equals("SLAVE"))
82          {
83            offlineToSlave++;
84          }
85        }
86      }
87  
88    }
89  
90    @Test
91    public void testEnablePartitionDuringDisable() throws Exception
92    {
93      // Logger.getRootLogger().setLevel(Level.INFO);
94      String className = TestHelper.getTestClassName();
95      String methodName = TestHelper.getTestMethodName();
96      String clusterName = className + "_" + methodName;
97  
98      System.out.println("START " + clusterName + " at "
99          + new Date(System.currentTimeMillis()));
100 
101     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
102                             "localhost", // participant name prefix
103                             "TestDB", // resource name prefix
104                             1, // resources
105                             10, // partitions per resource
106                             5, // number of nodes
107                             3, // replicas
108                             "MasterSlave",
109                             true); // do rebalance
110 
111     ClusterController controller =
112         new ClusterController(clusterName, "controller_0", ZK_ADDR);
113     controller.syncStart();
114 
115     // start participants
116     EnablePartitionTransition transition = new EnablePartitionTransition();
117     MockParticipant[] participants = new MockParticipant[5];
118     for (int i = 0; i < 5; i++)
119     {
120       String instanceName = "localhost_" + (12918 + i);
121 
122       if (instanceName.equals("localhost_12919"))
123       {
124         participants[i] =
125             new MockParticipant(clusterName,
126                                 instanceName,
127                                 ZK_ADDR,
128                                 transition);
129       }
130       else
131       {
132         participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
133       }
134       participants[i].syncStart();
135     }
136 
137     boolean result =
138         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
139                                                                                  clusterName));
140     Assert.assertTrue(result);
141 
142     // disable partitions
143     String command =
144         "--zkSvr " + ZK_ADDR + " --enablePartition false " + clusterName
145             + " localhost_12919 TestDB0 TestDB0_0";
146 
147     ClusterSetup.processCommandLineArgs(command.split("\\s+"));
148 
149     // ensure we get 1 slaveToOffline and 1 offlineToSlave after disable partition
150     long startT = System.currentTimeMillis();
151     while (System.currentTimeMillis() - startT < 10000)  // retry in 5s
152     {
153       if (transition.slaveToOfflineCnt > 0 && transition.offlineToSlave > 0)
154       {
155         break;
156       }
157 
158       Thread.sleep(100);
159     }
160     long endT = System.currentTimeMillis();
161     System.out.println("1 disable and re-enable took: " + (endT - startT) + "ms");
162     Assert.assertEquals(transition.slaveToOfflineCnt, 1, "should get 1 slaveToOffline transition");
163     Assert.assertEquals(transition.offlineToSlave, 1, "should get 1 offlineToSlave transition");
164 
165     // clean up
166     // wait for all zk callbacks done
167     Thread.sleep(1000);
168     controller.syncStop();
169     for (int i = 0; i < 5; i++)
170     {
171       participants[i].syncStop();
172     }
173 
174     System.out.println("END " + clusterName + " at "
175         + new Date(System.currentTimeMillis()));
176 
177   }
178 }