View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.List;
24  import java.util.Set;
25  
26  import org.apache.helix.InstanceType;
27  import org.apache.helix.PropertyPathConfig;
28  import org.apache.helix.PropertyType;
29  import org.apache.helix.TestHelper;
30  import org.apache.helix.ZkHelixTestManager;
31  import org.apache.helix.ZkTestHelper;
32  import org.apache.helix.controller.HelixControllerMain;
33  import org.apache.helix.manager.zk.CallbackHandler;
34  import org.apache.helix.manager.zk.ZKHelixManager;
35  import org.apache.helix.mock.participant.MockParticipant;
36  import org.apache.helix.tools.ClusterStateVerifier;
37  import org.apache.log4j.Logger;
38  import org.testng.Assert;
39  import org.testng.annotations.Test;
40  
41  
42  public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase
43  {
44    private static Logger LOG       =
45                                        Logger.getLogger(TestAddNodeAfterControllerStart.class);
46    final String          className = getShortClassName();
47  
48    @Test
49    public void testStandalone() throws Exception
50    {
51      String clusterName = className + "_standalone";
52      System.out.println("START " + clusterName + " at "
53          + new Date(System.currentTimeMillis()));
54  
55      final int nodeNr = 5;
56      
57      TestHelper.setupCluster(clusterName,
58                              ZK_ADDR,
59                              12918,
60                              "localhost",
61                              "TestDB",
62                              1,
63                              20,
64                              nodeNr - 1,
65                              3,
66                              "MasterSlave",
67                              true);
68  
69      MockParticipant[] participants = new MockParticipant[nodeNr];
70      for (int i = 0; i < nodeNr - 1; i++)
71      {
72        String instanceName = "localhost_" + (12918 + i);
73        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
74        new Thread(participants[i]).start();
75      }
76  
77      ZkHelixTestManager controller =
78          new ZkHelixTestManager(clusterName,
79                                              "controller_0",
80                                              InstanceType.CONTROLLER,
81                                              ZK_ADDR);
82      controller.connect();
83      boolean result;
84      result =
85          ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
86                                                                                            clusterName));
87      Assert.assertTrue(result);
88      String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
89      result = checkHandlers(controller.getHandlers(), msgPath);
90      Assert.assertTrue(result);
91  
92      _gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
93      _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
94  
95      participants[nodeNr - 1] =
96          new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
97      new Thread(participants[nodeNr - 1]).start();
98      result =
99          ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
100                                                                                           clusterName));
101     Assert.assertTrue(result);
102     msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12922");
103     result = checkHandlers(controller.getHandlers(), msgPath);
104     Assert.assertTrue(result);
105 
106     // clean up
107 //    controller.disconnect();
108 //    for (int i = 0; i < nodeNr; i++)
109 //    {
110 //      participants[i].syncStop();
111 //    }
112 
113     System.out.println("END " + clusterName + " at "
114         + new Date(System.currentTimeMillis()));
115   }
116 
117   @Test
118   public void testDistributed() throws Exception
119   {
120     String clusterName = className + "_distributed";
121     System.out.println("START " + clusterName + " at "
122         + new Date(System.currentTimeMillis()));
123 
124     // setup grand cluster
125     TestHelper.setupCluster("GRAND_" + clusterName,
126                             ZK_ADDR,
127                             0,
128                             "controller",
129                             null,
130                             0,
131                             0,
132                             1,
133                             0,
134                             null,
135                             true);
136    
137     TestHelper.startController("GRAND_" + clusterName,
138                                "controller_0",
139                                ZK_ADDR,
140                                HelixControllerMain.DISTRIBUTED);
141     
142     // setup cluster
143     _gSetupTool.addCluster(clusterName, true);
144     _gSetupTool.activateCluster(clusterName, "GRAND_" + clusterName, true);    // addCluster2
145 
146     boolean result;
147     result =
148         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
149                                                                                           "GRAND_" + clusterName));
150     Assert.assertTrue(result);
151     
152     // add node/resource, and do rebalance
153     final int nodeNr = 2;
154     for (int i = 0; i < nodeNr - 1; i++)
155     {
156       int port = 12918 + i;
157       _gSetupTool.addInstanceToCluster(clusterName, "localhost_" + port);
158     }
159 
160     _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
161     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
162     
163     MockParticipant[] participants = new MockParticipant[nodeNr];
164     for (int i = 0; i < nodeNr - 1; i++)
165     {
166       String instanceName = "localhost_" + (12918 + i);
167       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
168       participants[i].syncStart();
169 //      new Thread(participants[i]).start();
170     }
171 
172     result =
173         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
174                                                                                           clusterName));
175     Assert.assertTrue(result);
176     
177     // check if controller_0 has message listener for localhost_12918
178     String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
179     int numberOfListeners = ZkTestHelper.numberOfListeners(ZK_ADDR, msgPath);
180     // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
181     Assert.assertEquals(numberOfListeners, 2);  // 1 of participant, and 1 of controller
182 
183     _gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
184     _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
185 
186     participants[nodeNr - 1] =
187         new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
188     participants[nodeNr - 1].syncStart();
189 //    new Thread(participants[nodeNr - 1]).start();
190     
191     result =
192         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
193                                                                                         clusterName));
194     Assert.assertTrue(result);
195     // check if controller_0 has message listener for localhost_12919
196     msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12919");
197     numberOfListeners = ZkTestHelper.numberOfListeners(ZK_ADDR, msgPath);
198     // System.out.println("numberOfListeners(" + msgPath + "): " + numberOfListeners);
199     Assert.assertEquals(numberOfListeners, 2);  // 1 of participant, and 1 of controller
200 
201     // clean up
202 //    for (int i = 0; i < nodeNr; i++)
203 //    {
204 //      participants[i].syncStop();
205 //    }
206 
207     System.out.println("END " + clusterName + " at "
208         + new Date(System.currentTimeMillis()));
209   }
210 
211   boolean checkHandlers(List<CallbackHandler> handlers, String path)
212   {
213 //    System.out.println(handlers.size() + " handlers: ");
214     for (CallbackHandler handler : handlers)
215     {
216 //      System.out.println(handler.getPath());
217       if (handler.getPath().equals(path))
218       {
219         return true;
220       }
221     }
222     return false;
223   }
224   
225   // TODO: need to add a test case for ParticipantCodeRunner
226 }