1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23
24 import org.apache.helix.HelixManager;
25 import org.apache.helix.NotificationContext;
26 import org.apache.helix.TestHelper;
27 import org.apache.helix.HelixConstants.ChangeType;
28 import org.apache.helix.NotificationContext.Type;
29 import org.apache.helix.PropertyKey.Builder;
30 import org.apache.helix.controller.HelixControllerMain;
31 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32 import org.apache.helix.manager.zk.ZNRecordSerializer;
33 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34 import org.apache.helix.manager.zk.ZkClient;
35 import org.apache.helix.mock.participant.MockJobIntf;
36 import org.apache.helix.mock.participant.MockParticipant;
37 import org.apache.helix.model.LiveInstance;
38 import org.apache.helix.participant.CustomCodeCallbackHandler;
39 import org.apache.helix.participant.HelixCustomCodeRunner;
40 import org.apache.helix.tools.ClusterStateVerifier;
41 import org.testng.Assert;
42 import org.testng.annotations.Test;
43
44
45 public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase
46 {
47 private final String _clusterName = "CLUSTER_" + getShortClassName();
48 private final int _nodeNb = 5;
49 private final int _startPort = 12918;
50 private final MockCallback _callback = new MockCallback();
51
52 class MockCallback implements CustomCodeCallbackHandler
53 {
54 boolean _isCallbackInvoked;
55
56 @Override
57 public void onCallback(NotificationContext context)
58 {
59 HelixManager manager = context.getManager();
60 Type type = context.getType();
61 _isCallbackInvoked = true;
62
63 }
64
65 }
66
67 class MockJob implements MockJobIntf
68 {
69 @Override
70 public void doPreConnectJob(HelixManager manager)
71 {
72 try
73 {
74
75
76 if (manager.getInstanceName().equals("localhost_12918"))
77 {
78 Thread.sleep(2000);
79 }
80
81 HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
82 customCodeRunner.invoke(_callback)
83 .on(ChangeType.LIVE_INSTANCE)
84 .usingLeaderStandbyModel("TestParticLeader")
85 .start();
86 } catch (Exception e)
87 {
88
89 e.printStackTrace();
90 }
91 }
92
93 @Override
94 public void doPostConnectJob(HelixManager manager)
95 {
96
97
98 }
99
100 }
101
102 @Test
103 public void testCustomCodeRunner() throws Exception
104 {
105 System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
106
107 TestHelper.setupCluster(_clusterName,
108 ZK_ADDR,
109 _startPort,
110 "localhost",
111 "TestDB",
112 1,
113 5,
114 _nodeNb,
115 _nodeNb,
116 "MasterSlave",
117 true);
118
119 TestHelper.startController(_clusterName,
120 "controller_0",
121 ZK_ADDR,
122 HelixControllerMain.STANDALONE);
123
124 MockParticipant[] partics = new MockParticipant[5];
125 for (int i = 0; i < _nodeNb; i++)
126 {
127 String instanceName = "localhost_" + (_startPort + i);
128
129 partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR,
130 null, new MockJob());
131 partics[i].syncStart();
132
133 }
134 boolean result = ClusterStateVerifier.verifyByPolling(
135 new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
136 Assert.assertTrue(result);
137
138 Thread.sleep(1000);
139 Assert.assertTrue(_callback._isCallbackInvoked);
140 _callback._isCallbackInvoked = false;
141
142
143 ZkClient zkClient = new ZkClient(ZK_ADDR);
144 zkClient.setZkSerializer(new ZNRecordSerializer());
145 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
146 Builder keyBuilder = accessor.keyBuilder();
147
148 LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
149 newLiveIns.setHelixVersion("0.0.0");
150 newLiveIns.setSessionId("randomSessionId");
151 accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
152
153 Thread.sleep(1000);
154 Assert.assertTrue(_callback._isCallbackInvoked);
155
156 System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
157 }
158 }