1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.apache.helix.ConfigChangeListener;
28 import org.apache.helix.CurrentStateChangeListener;
29 import org.apache.helix.ExternalViewChangeListener;
30 import org.apache.helix.HelixConstants;
31 import org.apache.helix.HelixDataAccessor;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.HelixManagerFactory;
34 import org.apache.helix.IdealStateChangeListener;
35 import org.apache.helix.InstanceType;
36 import org.apache.helix.LiveInstanceChangeListener;
37 import org.apache.helix.MessageListener;
38 import org.apache.helix.NotificationContext;
39 import org.apache.helix.PropertyKey.Builder;
40 import org.apache.helix.manager.zk.ZNRecordSerializer;
41 import org.apache.helix.manager.zk.ZkClient;
42 import org.apache.helix.model.CurrentState;
43 import org.apache.helix.model.ExternalView;
44 import org.apache.helix.model.IdealState;
45 import org.apache.helix.model.InstanceConfig;
46 import org.apache.helix.model.LiveInstance;
47 import org.apache.helix.model.Message;
48 import org.apache.helix.model.Message.MessageType;
49 import org.apache.helix.tools.ClusterSetup;
50 import org.testng.AssertJUnit;
51 import org.testng.annotations.AfterClass;
52 import org.testng.annotations.BeforeClass;
53 import org.testng.annotations.Test;
54
55
56 public class TestZKCallback extends ZkUnitTestBase
57 {
58 private final String clusterName = CLUSTER_PREFIX + "_" + getShortClassName();
59
60 ZkClient _zkClient;
61
62 private static String[] createArgs(String str)
63 {
64 String[] split = str.split("[ ]+");
65 System.out.println(Arrays.toString(split));
66 return split;
67 }
68
69 public class TestCallbackListener implements MessageListener, LiveInstanceChangeListener,
70 ConfigChangeListener, CurrentStateChangeListener, ExternalViewChangeListener,
71 IdealStateChangeListener
72 {
73 boolean externalViewChangeReceived = false;
74 boolean liveInstanceChangeReceived = false;
75 boolean configChangeReceived = false;
76 boolean currentStateChangeReceived = false;
77 boolean messageChangeReceived = false;
78 boolean idealStateChangeReceived = false;
79
80 @Override
81 public void onExternalViewChange(List<ExternalView> externalViewList,
82 NotificationContext changeContext)
83 {
84 externalViewChangeReceived = true;
85 }
86
87 @Override
88 public void onStateChange(String instanceName, List<CurrentState> statesInfo,
89 NotificationContext changeContext)
90 {
91 currentStateChangeReceived = true;
92 }
93
94 @Override
95 public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext)
96 {
97 configChangeReceived = true;
98 }
99
100 @Override
101 public void onLiveInstanceChange(List<LiveInstance> liveInstances,
102 NotificationContext changeContext)
103 {
104 liveInstanceChangeReceived = true;
105 }
106
107 @Override
108 public void onMessage(String instanceName, List<Message> messages,
109 NotificationContext changeContext)
110 {
111 messageChangeReceived = true;
112 }
113
114 void Reset()
115 {
116 externalViewChangeReceived = false;
117 liveInstanceChangeReceived = false;
118 configChangeReceived = false;
119 currentStateChangeReceived = false;
120 messageChangeReceived = false;
121 idealStateChangeReceived = false;
122 }
123
124 @Override
125 public void onIdealStateChange(List<IdealState> idealState, NotificationContext changeContext)
126 {
127
128 idealStateChangeReceived = true;
129 }
130 }
131
132 @Test()
133 public void testInvocation() throws Exception
134 {
135
136 HelixManager testHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
137 "localhost_8900", InstanceType.PARTICIPANT, ZK_ADDR);
138 testHelixManager.connect();
139
140 TestZKCallback test = new TestZKCallback();
141
142 TestZKCallback.TestCallbackListener testListener = test.new TestCallbackListener();
143
144 testHelixManager.addMessageListener(testListener, "localhost_8900");
145 testHelixManager.addCurrentStateChangeListener(testListener, "localhost_8900",
146 testHelixManager.getSessionId());
147 testHelixManager.addConfigChangeListener(testListener);
148 testHelixManager.addIdealStateChangeListener(testListener);
149 testHelixManager.addExternalViewChangeListener(testListener);
150 testHelixManager.addLiveInstanceChangeListener(testListener);
151
152
153 AssertJUnit.assertTrue(testListener.configChangeReceived
154 & testListener.currentStateChangeReceived & testListener.externalViewChangeReceived
155 & testListener.idealStateChangeReceived & testListener.liveInstanceChangeReceived
156 & testListener.messageChangeReceived);
157
158 testListener.Reset();
159 HelixDataAccessor accessor = testHelixManager.getHelixDataAccessor();
160 Builder keyBuilder = accessor.keyBuilder();
161
162 ExternalView extView = new ExternalView("db-12345");
163 accessor.setProperty(keyBuilder.externalView("db-12345"), extView);
164 Thread.sleep(100);
165 AssertJUnit.assertTrue(testListener.externalViewChangeReceived);
166 testListener.Reset();
167
168 CurrentState curState = new CurrentState("db-12345");
169 curState.setSessionId("sessionId");
170 curState.setStateModelDefRef("StateModelDef");
171 accessor.setProperty(keyBuilder.currentState("localhost_8900", testHelixManager.getSessionId(), curState.getId()), curState);
172 Thread.sleep(100);
173 AssertJUnit.assertTrue(testListener.currentStateChangeReceived);
174 testListener.Reset();
175
176 IdealState idealState = new IdealState("db-1234");
177 idealState.setNumPartitions(400);
178 idealState.setReplicas(Integer.toString(2));
179 idealState.setStateModelDefRef("StateModeldef");
180 accessor.setProperty(keyBuilder.idealStates("db-1234"), idealState);
181 Thread.sleep(100);
182 AssertJUnit.assertTrue(testListener.idealStateChangeReceived);
183 testListener.Reset();
184
185
186
187
188
189
190
191
192
193
194
195
196 testListener.Reset();
197 Message message = new Message(MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
198 message.setTgtSessionId("*");
199 message.setResourceName("testResource");
200 message.setPartitionName("testPartitionKey");
201 message.setStateModelDef("MasterSlave");
202 message.setToState("toState");
203 message.setFromState("fromState");
204 message.setTgtName("testTarget");
205 message.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
206
207 accessor.setProperty(keyBuilder.message("localhost_8900", message.getId()), message);
208 Thread.sleep(500);
209 AssertJUnit.assertTrue(testListener.messageChangeReceived);
210
211
212 LiveInstance liveInstance = new LiveInstance("localhost_9801");
213 liveInstance.setSessionId(UUID.randomUUID().toString());
214 liveInstance.setHelixVersion(UUID.randomUUID().toString());
215 accessor.setProperty(keyBuilder.liveInstance("localhost_9801"), liveInstance);
216 Thread.sleep(500);
217 AssertJUnit.assertTrue(testListener.liveInstanceChangeReceived);
218 testListener.Reset();
219
220
221
222
223 }
224
225 @BeforeClass()
226 public void beforeClass() throws IOException, Exception
227 {
228 _zkClient = new ZkClient(ZK_ADDR);
229 _zkClient.setZkSerializer(new ZNRecordSerializer());
230 if (_zkClient.exists("/" + clusterName))
231 {
232 _zkClient.deleteRecursive("/" + clusterName);
233 }
234
235 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addCluster "
236 + clusterName));
237
238
239
240 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addResource "
241 + clusterName + " db-12345 120 MasterSlave"));
242 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
243 + " localhost:8900"));
244 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
245 + " localhost:8901"));
246 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
247 + " localhost:8902"));
248 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
249 + " localhost:8903"));
250 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -addNode " + clusterName
251 + " localhost:8904"));
252 ClusterSetup.processCommandLineArgs(createArgs("-zkSvr " + ZK_ADDR + " -rebalance "
253 + clusterName + " db-12345 3"));
254 }
255
256 @AfterClass()
257 public void afterClass()
258 {
259 _zkClient.close();
260 }
261
262 }