View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.Arrays;
24  import java.util.List;
25  import java.util.UUID;
26  
27  import org.apache.helix.ConfigChangeListener;
28  import org.apache.helix.CurrentStateChangeListener;
29  import org.apache.helix.ExternalViewChangeListener;
30  import org.apache.helix.HelixConstants;
31  import org.apache.helix.HelixDataAccessor;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.HelixManagerFactory;
34  import org.apache.helix.IdealStateChangeListener;
35  import org.apache.helix.InstanceType;
36  import org.apache.helix.LiveInstanceChangeListener;
37  import org.apache.helix.MessageListener;
38  import org.apache.helix.NotificationContext;
39  import org.apache.helix.PropertyKey.Builder;
40  import org.apache.helix.manager.zk.ZNRecordSerializer;
41  import org.apache.helix.manager.zk.ZkClient;
42  import org.apache.helix.model.CurrentState;
43  import org.apache.helix.model.ExternalView;
44  import org.apache.helix.model.IdealState;
45  import org.apache.helix.model.InstanceConfig;
46  import org.apache.helix.model.LiveInstance;
47  import org.apache.helix.model.Message;
48  import org.apache.helix.model.Message.MessageType;
49  import org.apache.helix.tools.ClusterSetup;
50  import org.testng.AssertJUnit;
51  import org.testng.annotations.AfterClass;
52  import org.testng.annotations.BeforeClass;
53  import org.testng.annotations.Test;
54  
55  
56  public class TestZKCallback extends ZkUnitTestBase
57  {
58    private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
59  
60    ZkClient _zkClient;
61  
62    private static String[] createArgs(String str)
63    {
64      String[] split = str.split("[ ]+");
65      System.out.println(Arrays.toString(split));
66      return split;
67    }
68  
69    public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
70        ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
71        IdealStateChangeListener
72    {
73      boolean externalViewChangeReceived = false;
74      boolean liveInstanceChangeReceived = false;
75      boolean configChangeReceived = false;
76      boolean currentStateChangeReceived = false;
77      boolean messageChangeReceived = false;
78      boolean idealStateChangeReceived = false;
79  
80      @Override
81      public void onExternalViewChange(List<ExternalView> externalViewList,
82          NotificationContext changeContext)
83      {
84        externalViewChangeReceived = true;
85      }
86  
87      @Override
88      public void onStateChange(String instanceName, List<CurrentState> statesInfo,
89          NotificationContext changeContext)
90      {
91        currentStateChangeReceived = true;
92      }
93  
94      @Override
95      public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
96      {
97        configChangeReceived = true;
98      }
99  
100     @Override
101     public void onLiveInstanceChange(List<LiveInstance> liveInstances,
102         NotificationContext changeContext)
103     {
104       liveInstanceChangeReceived = true;
105     }
106 
107     @Override
108     public void onMessage(String instanceName, List<Message> messages,
109         NotificationContext changeContext)
110     {
111       messageChangeReceived = true;
112     }
113 
114     void Reset()
115     {
116       externalViewChangeReceived = false;
117       liveInstanceChangeReceived = false;
118       configChangeReceived = false;
119       currentStateChangeReceived = false;
120       messageChangeReceived = false;
121       idealStateChangeReceived = false;
122     }
123 
124     @Override
125     public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
126     {
127       // TODO Auto-generated method stub
128       idealStateChangeReceived = true;
129     }
130   }
131 
132   @Test()
133   public void testInvocation() throws Exception
134   {
135 
136     HelixManager testHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
137         "localhost_8900", InstanceType.PARTICIPANT, ZK_ADDR);
138     testHelixManager.connect();
139 
140     TestZKCallback test = new TestZKCallback();
141 
142     TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
143 
144     testHelixManager.addMessageListener(testListener, "localhost_8900");
145     testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
146         testHelixManager.getSessionId());
147     testHelixManager.addConfigChangeListener(testListener);
148     testHelixManager.addIdealStateChangeListener(testListener);
149     testHelixManager.addExternalViewChangeListener(testListener);
150     testHelixManager.addLiveInstanceChangeListener(testListener);
151     // Initial add listener should trigger the first execution of the
152     // listener callbacks
153     AssertJUnit.assertTrue(testListener.configChangeReceived
154         & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
155         & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived
156         & testListener.messageChangeReceived);
157 
158     testListener.Reset();
159     HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
160     Builder keyBuilder = accessor.keyBuilder();
161     
162     ExternalView extView = new ExternalView("db-12345");
163     accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
164     Thread.sleep(100);
165     AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
166     testListener.Reset();
167 
168     CurrentState curState = new CurrentState("db-12345");
169     curState.setSessionId("sessionId");
170     curState.setStateModelDefRef("StateModelDef");
171     accessor.setProperty(keyBuilder.currentState("localhost_8900", testHelixManager.getSessionId(), curState.getId()), curState);
172     Thread.sleep(100);
173     AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
174     testListener.Reset();
175 
176     IdealState idealState = new IdealState("db-1234");
177     idealState.setNumPartitions(400);
178     idealState.setReplicas(Integer.toString(2));
179     idealState.setStateModelDefRef("StateModeldef");
180     accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
181     Thread.sleep(100);
182     AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
183     testListener.Reset();
184 
185     // dummyRecord = new ZNRecord("db-12345");
186     // dataAccessor.setProperty(PropertyType.IDEALSTATES, idealState, "db-12345"
187     // );
188     // Thread.sleep(100);
189     // AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
190     // testListener.Reset();
191 
192     // dummyRecord = new ZNRecord("localhost:8900");
193     // List<ZNRecord> recList = new ArrayList<ZNRecord>();
194     // recList.add(dummyRecord);
195 
196     testListener.Reset();
197     Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
198     message.setTgtSessionId("*");
199     message.setResourceName("testResource");
200     message.setPartitionName("testPartitionKey");
201     message.setStateModelDef("MasterSlave");
202     message.setToState("toState");
203     message.setFromState("fromState");
204     message.setTgtName("testTarget");
205     message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
206 
207     accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
208     Thread.sleep(500);
209     AssertJUnit.assertTrue(testListener.messageChangeReceived);
210 
211     // dummyRecord = new ZNRecord("localhost_9801");
212     LiveInstance liveInstance = new LiveInstance("localhost_9801");
213     liveInstance.setSessionId(UUID.randomUUID().toString());
214     liveInstance.setHelixVersion(UUID.randomUUID().toString());
215     accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
216     Thread.sleep(500);
217     AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
218     testListener.Reset();
219 
220     // dataAccessor.setNodeConfigs(recList); Thread.sleep(100);
221     // AssertJUnit.assertTrue(testListener.configChangeReceived);
222     // testListener.Reset();
223   }
224 
225   @BeforeClass()
226   public void beforeClass() throws IOException, Exception
227   {
228     _zkClient = new ZkClient(ZK_ADDR);
229     _zkClient.setZkSerializer(new ZNRecordSerializer());
230     if (_zkClient.exists("/" + clusterName))
231     {
232       _zkClient.deleteRecursive("/" + clusterName);
233     }
234 
235     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addCluster "
236         + clusterName));
237     // ClusterSetup
238     // .processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR +
239     // " -addCluster relay-cluster-12345"));
240     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addResource "
241         + clusterName + " db-12345 120 MasterSlave"));
242     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
243         + " localhost:8900"));
244     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
245         + " localhost:8901"));
246     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
247         + " localhost:8902"));
248     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
249         + " localhost:8903"));
250     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
251         + " localhost:8904"));
252     ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -rebalance "
253         + clusterName + " db-12345 3"));
254   }
255 
256   @AfterClass()
257   public void afterClass()
258   {
259     _zkClient.close();
260   }
261 
262 }