1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.helix.AccessOption;
30 import org.apache.helix.model.ConfigScope;
31 import org.apache.helix.model.HelixConfigScope;
32 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
33 import org.apache.helix.model.builder.ConfigScopeBuilder;
34 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
35 import org.apache.helix.HelixAdmin;
36 import org.apache.helix.HelixDataAccessor;
37 import org.apache.helix.HelixException;
38 import org.apache.helix.InstanceType;
39 import org.apache.helix.LiveInstanceInfoProvider;
40 import org.apache.helix.PropertyKey.Builder;
41 import org.apache.helix.TestHelper;
42 import org.apache.helix.ZNRecord;
43 import org.apache.helix.ZkHelixTestManager;
44 import org.apache.helix.ZkTestHelper;
45 import org.apache.helix.ZkUnitTestBase;
46 import org.apache.helix.manager.MockListener;
47 import org.apache.helix.manager.zk.ZKHelixManager;
48 import org.apache.helix.model.LiveInstance;
49 import org.apache.helix.store.zk.ZkHelixPropertyStore;
50 import org.apache.zookeeper.data.Stat;
51 import org.testng.Assert;
52 import org.testng.AssertJUnit;
53 import org.testng.annotations.Test;
54
55
56 public class TestZkClusterManager extends ZkUnitTestBase
57 {
58 final String className = getShortClassName();
59
60 @Test()
61 public void testController() throws Exception
62 {
63 System.out.println("START " + className + ".testController() at " + new Date(System.currentTimeMillis()));
64 final String clusterName = CLUSTER_PREFIX + "_" + className + "_controller";
65
66
67 if (_gZkClient.exists("/" + clusterName))
68 {
69 _gZkClient.deleteRecursive("/" + clusterName);
70 }
71
72 ZKHelixManager controller = new ZKHelixManager(clusterName, null,
73 InstanceType.CONTROLLER,
74 ZK_ADDR);
75 try
76 {
77 controller.connect();
78 Assert.fail("Should throw HelixException if initial cluster structure is not setup");
79 } catch (HelixException e)
80 {
81
82 }
83
84 TestHelper.setupEmptyCluster(_gZkClient, clusterName);
85
86 controller.connect();
87 AssertJUnit.assertTrue(controller.isConnected());
88 controller.connect();
89 AssertJUnit.assertTrue(controller.isConnected());
90
91 MockListener listener = new MockListener();
92 listener.reset();
93
94 try
95 {
96 controller.addControllerListener(null);
97 Assert.fail("Should throw HelixException");
98 } catch (HelixException e)
99 {
100
101 }
102
103 Builder keyBuilder = new Builder(controller.getClusterName());
104 controller.addControllerListener(listener);
105 AssertJUnit.assertTrue(listener.isControllerChangeListenerInvoked);
106 controller.removeListener(keyBuilder.controller(), listener);
107
108 ZkHelixPropertyStore<ZNRecord> store = controller.getHelixPropertyStore();
109 ZNRecord record = new ZNRecord("node_1");
110 int options = 0;
111 store.set("/node_1", record, AccessOption.PERSISTENT);
112 Stat stat = new Stat();
113 record = store.get("/node_1",stat, options);
114 AssertJUnit.assertEquals("node_1", record.getId());
115
116 controller.getMessagingService();
117 controller.getHealthReportCollector();
118 controller.getClusterManagmentTool();
119
120 controller.handleNewSession();
121 controller.disconnect();
122 AssertJUnit.assertFalse(controller.isConnected());
123
124 System.out.println("END " + className + ".testController() at " + new Date(System.currentTimeMillis()));
125 }
126
127 @Test
128 public void testLiveInstanceInfoProvider() throws Exception
129 {
130 System.out.println("START " + className + ".testLiveInstanceInfoProvider() at " + new Date(System.currentTimeMillis()));
131 final String clusterName = CLUSTER_PREFIX + "_" + className + "_liveInstanceInfoProvider";
132 class provider implements LiveInstanceInfoProvider
133 {
134 boolean _flag = false;
135 public provider(boolean genSessionId)
136 {
137 _flag = genSessionId;
138 }
139 @Override
140 public ZNRecord getAdditionalLiveInstanceInfo()
141 {
142 ZNRecord record = new ZNRecord("info");
143 record.setSimpleField("simple", "value");
144 List<String> listFieldVal = new ArrayList<String>();
145 listFieldVal.add("val1");
146 listFieldVal.add("val2");
147 listFieldVal.add("val3");
148 record.setListField("list", listFieldVal);
149 Map<String,String> mapFieldVal = new HashMap<String, String>();
150 mapFieldVal.put("k1", "val1");
151 mapFieldVal.put("k2","val2");
152 mapFieldVal.put("k3","val3");
153 record.setMapField("map", mapFieldVal);
154 if(_flag)
155 {
156 record.setSimpleField("SESSION_ID", "value");
157 record.setSimpleField("LIVE_INSTANCE", "value");
158 record.setSimpleField("Others", "value");
159 }
160 return record;
161 }
162 }
163
164
165 TestHelper.setupEmptyCluster(_gZkClient, clusterName);
166 int[] ids = {0,1,2,3, 4, 5};
167 setupInstances(clusterName, ids);
168
169
170 ZKHelixManager manager = new ZKHelixManager(clusterName, "localhost_0",
171 InstanceType.PARTICIPANT,
172 ZK_ADDR);
173 manager.connect();
174 HelixDataAccessor accessor = manager.getHelixDataAccessor();
175
176 LiveInstance liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_0"));
177 Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 0);
178 Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 0);
179 Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 3);
180
181 manager = new ZKHelixManager(clusterName, "localhost_1",
182 InstanceType.PARTICIPANT,
183 ZK_ADDR);
184 manager.setLiveInstanceInfoProvider(new provider(false));
185
186 manager.connect();
187 accessor = manager.getHelixDataAccessor();
188
189 liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_1"));
190 Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
191 Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
192 Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 4);
193
194 manager = new ZKHelixManager(clusterName, "localhost_2",
195 InstanceType.PARTICIPANT,
196 ZK_ADDR);
197 manager.setLiveInstanceInfoProvider(new provider(true));
198
199 manager.connect();
200 accessor = manager.getHelixDataAccessor();
201
202 liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_2"));
203 Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
204 Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
205 Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
206 Assert.assertFalse(liveInstance.getSessionId().equals("value"));
207 Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
208
209
210
211 ZkHelixTestManager manager2 = new ZkHelixTestManager(clusterName, "localhost_3",
212 InstanceType.PARTICIPANT,
213 ZK_ADDR);
214 manager2.setLiveInstanceInfoProvider(new provider(true));
215
216 manager2.connect();
217 accessor = manager2.getHelixDataAccessor();
218
219 liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
220 Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
221 Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
222 Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
223 Assert.assertFalse(liveInstance.getSessionId().equals("value"));
224 Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
225 String sessionId = liveInstance.getSessionId();
226
227 ZkTestHelper.expireSession(manager2.getZkClient());
228 Thread.sleep(1000);
229
230 liveInstance = accessor.getProperty(accessor.keyBuilder().liveInstance("localhost_3"));
231 Assert.assertTrue(liveInstance.getRecord().getListFields().size() == 1);
232 Assert.assertTrue(liveInstance.getRecord().getMapFields().size() == 1);
233 Assert.assertTrue(liveInstance.getRecord().getSimpleFields().size() == 5);
234 Assert.assertFalse(liveInstance.getSessionId().equals("value"));
235 Assert.assertFalse(liveInstance.getLiveInstance().equals("value"));
236 Assert.assertFalse(sessionId.equals(liveInstance.getSessionId()));
237
238 System.out.println("END " + className + ".testLiveInstanceInfoProvider() at " + new Date(System.currentTimeMillis()));
239 }
240
241 @Test()
242 public void testAdministrator() throws Exception
243 {
244 System.out.println("START " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
245 final String clusterName = CLUSTER_PREFIX + "_" + className + "_admin";
246
247
248 if (_gZkClient.exists("/" + clusterName))
249 {
250 _gZkClient.deleteRecursive("/" + clusterName);
251 }
252
253 ZKHelixManager admin = new ZKHelixManager(clusterName, null,
254 InstanceType.ADMINISTRATOR,
255 ZK_ADDR);
256
257 TestHelper.setupEmptyCluster(_gZkClient, clusterName);
258
259 admin.connect();
260 AssertJUnit.assertTrue(admin.isConnected());
261
262 HelixAdmin adminTool = admin.getClusterManagmentTool();
263
264
265 HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTITION)
266 .forCluster(clusterName)
267 .forResource("testResource")
268 .forPartition("testPartition")
269 .build();
270
271 Map<String, String> properties = new HashMap<String, String>();
272 properties.put("pKey1", "pValue1");
273 properties.put("pKey2", "pValue2");
274 adminTool.setConfig(scope, properties);
275
276 properties = adminTool.getConfig(scope, Arrays.asList("pKey1", "pKey2"));
277 Assert.assertEquals(properties.size(), 2);
278 Assert.assertEquals(properties.get("pKey1"), "pValue1");
279 Assert.assertEquals(properties.get("pKey2"), "pValue2");
280
281 admin.disconnect();
282 AssertJUnit.assertFalse(admin.isConnected());
283
284 System.out.println("END " + className + ".testAdministrator() at " + new Date(System.currentTimeMillis()));
285 }
286 }