View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  
26  import org.apache.helix.TestHelper;
27  import org.apache.helix.ZNRecord;
28  import org.apache.helix.PropertyKey.Builder;
29  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
30  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
31  import org.apache.helix.mock.controller.ClusterController;
32  import org.apache.helix.mock.participant.MockParticipant;
33  import org.apache.helix.mock.participant.MockMSModelFactory;
34  import org.apache.helix.model.CurrentState;
35  import org.apache.helix.model.ExternalView;
36  import org.apache.helix.model.IdealState;
37  import org.apache.helix.model.Message;
38  import org.apache.helix.tools.ClusterSetup;
39  import org.apache.helix.tools.ClusterStateVerifier;
40  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
41  import org.testng.Assert;
42  import org.testng.annotations.Test;
43  
44  
45  public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
46  {
47    @Test
48    public void testBasic() throws Exception
49    {
50      // Logger.getRootLogger().setLevel(Level.INFO);
51      String className = TestHelper.getTestClassName();
52      String methodName = TestHelper.getTestMethodName();
53      String clusterName = className + "_" + methodName;
54      final int n = 5;
55  
56      System.out.println("START " + clusterName + " at "
57          + new Date(System.currentTimeMillis()));
58  
59      MockParticipant[] participants = new MockParticipant[n];
60  
61      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
62                              "localhost", // participant name prefix
63                              "TestDB", // resource name prefix
64                              1, // resources
65                              10, // partitions per resource
66                              n, // number of nodes
67                              3, // replicas
68                              "MasterSlave",
69                              true); // do rebalance
70  
71      ClusterController controller =
72          new ClusterController(clusterName, "controller_0", ZK_ADDR);
73      controller.syncStart();
74  
75      // start participants
76      for (int i = 0; i < n; i++)
77      {
78        String instanceName = "localhost_" + (12918 + i);
79  
80        participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
81        participants[i].syncStart();
82      }
83  
84      boolean result =
85          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
86                                                                                   clusterName));
87      Assert.assertTrue(result);
88  
89      // add a new idealState without registering message handling factory
90      ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
91      setupTool.addResourceToCluster(clusterName, "TestDB1", 16, "MasterSlave");
92  
93      ZkBaseDataAccessor<ZNRecord> baseAccessor =
94          new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
95      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
96      Builder keyBuilder = accessor.keyBuilder();
97      IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB1"));
98      idealState.setStateModelFactoryName("TestDB1_Factory");
99      accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
100     setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
101 
102     // assert that we have received OFFLINE->SLAVE messages for all partitions
103     int totalMsgs = 0;
104     for (int retry = 0; retry < 5; retry++) {
105     	Thread.sleep(100);
106     	totalMsgs = 0;
107         for (int i = 0; i < n; i++) {
108         	List<Message> msgs = accessor.getChildValues(keyBuilder.messages(participants[i].getInstanceName()));
109         	totalMsgs += msgs.size();
110         }
111         
112         if (totalMsgs == 48) // partition# x replicas
113         	break;
114     }
115     
116     Assert.assertEquals(totalMsgs, 48,
117       "Should accumulated 48 unprocessed messages (1 O->S per partition per replica) because TestDB1 is added without state-model-factory but was " + totalMsgs);
118 
119     // register "TestDB1_Factory" state model factory
120     // Logger.getRootLogger().setLevel(Level.INFO);
121     for (int i = 0; i < n; i++)
122     {
123       participants[i].getManager()
124                      .getStateMachineEngine()
125                      .registerStateModelFactory("MasterSlave",
126                                                 new MockMSModelFactory(),
127                                                 "TestDB1_Factory");
128     }
129 
130     result =
131         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
132                                                                                  clusterName));
133     Assert.assertTrue(result);
134 
135     // clean up
136     // wait for all zk callbacks done
137     Thread.sleep(1000);
138     controller.syncStop();
139     for (int i = 0; i < 5; i++)
140     {
141       participants[i].syncStop();
142     }
143 
144     System.out.println("END " + clusterName + " at "
145         + new Date(System.currentTimeMillis()));
146 
147   }
148 }