1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.List;
24 import java.util.Set;
25
26 import org.apache.helix.InstanceType;
27 import org.apache.helix.PropertyPathConfig;
28 import org.apache.helix.PropertyType;
29 import org.apache.helix.TestHelper;
30 import org.apache.helix.ZkHelixTestManager;
31 import org.apache.helix.ZkTestHelper;
32 import org.apache.helix.controller.HelixControllerMain;
33 import org.apache.helix.manager.zk.CallbackHandler;
34 import org.apache.helix.manager.zk.ZKHelixManager;
35 import org.apache.helix.mock.participant.MockParticipant;
36 import org.apache.helix.tools.ClusterStateVerifier;
37 import org.apache.log4j.Logger;
38 import org.testng.Assert;
39 import org.testng.annotations.Test;
40
41
42 public class TestAddNodeAfterControllerStart extends ZkIntegrationTestBase
43 {
44 private static Logger LOG =
45 Logger.getLogger(TestAddNodeAfterControllerStart.class);
46 final String className = getShortClassName();
47
48 @Test
49 public void testStandalone() throws Exception
50 {
51 String clusterName = className + "_standalone";
52 System.out.println("START " + clusterName + " at "
53 + new Date(System.currentTimeMillis()));
54
55 final int nodeNr = 5;
56
57 TestHelper.setupCluster(clusterName,
58 ZK_ADDR,
59 12918,
60 "localhost",
61 "TestDB",
62 1,
63 20,
64 nodeNr - 1,
65 3,
66 "MasterSlave",
67 true);
68
69 MockParticipant[] participants = new MockParticipant[nodeNr];
70 for (int i = 0; i < nodeNr - 1; i++)
71 {
72 String instanceName = "localhost_" + (12918 + i);
73 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
74 new Thread(participants[i]).start();
75 }
76
77 ZkHelixTestManager controller =
78 new ZkHelixTestManager(clusterName,
79 "controller_0",
80 InstanceType.CONTROLLER,
81 ZK_ADDR);
82 controller.connect();
83 boolean result;
84 result =
85 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
86 clusterName));
87 Assert.assertTrue(result);
88 String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
89 result = checkHandlers(controller.getHandlers(), msgPath);
90 Assert.assertTrue(result);
91
92 _gSetupTool.addInstanceToCluster(clusterName, "localhost_12922");
93 _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 3);
94
95 participants[nodeNr - 1] =
96 new MockParticipant(clusterName, "localhost_12922", ZK_ADDR);
97 new Thread(participants[nodeNr - 1]).start();
98 result =
99 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
100 clusterName));
101 Assert.assertTrue(result);
102 msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12922");
103 result = checkHandlers(controller.getHandlers(), msgPath);
104 Assert.assertTrue(result);
105
106
107
108
109
110
111
112
113 System.out.println("END " + clusterName + " at "
114 + new Date(System.currentTimeMillis()));
115 }
116
117 @Test
118 public void testDistributed() throws Exception
119 {
120 String clusterName = className + "_distributed";
121 System.out.println("START " + clusterName + " at "
122 + new Date(System.currentTimeMillis()));
123
124
125 TestHelper.setupCluster("GRAND_" + clusterName,
126 ZK_ADDR,
127 0,
128 "controller",
129 null,
130 0,
131 0,
132 1,
133 0,
134 null,
135 true);
136
137 TestHelper.startController("GRAND_" + clusterName,
138 "controller_0",
139 ZK_ADDR,
140 HelixControllerMain.DISTRIBUTED);
141
142
143 _gSetupTool.addCluster(clusterName, true);
144 _gSetupTool.activateCluster(clusterName, "GRAND_" + clusterName, true);
145
146 boolean result;
147 result =
148 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
149 "GRAND_" + clusterName));
150 Assert.assertTrue(result);
151
152
153 final int nodeNr = 2;
154 for (int i = 0; i < nodeNr - 1; i++)
155 {
156 int port = 12918 + i;
157 _gSetupTool.addInstanceToCluster(clusterName, "localhost_" + port);
158 }
159
160 _gSetupTool.addResourceToCluster(clusterName, "TestDB0", 1, "LeaderStandby");
161 _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 1);
162
163 MockParticipant[] participants = new MockParticipant[nodeNr];
164 for (int i = 0; i < nodeNr - 1; i++)
165 {
166 String instanceName = "localhost_" + (12918 + i);
167 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
168 participants[i].syncStart();
169
170 }
171
172 result =
173 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
174 clusterName));
175 Assert.assertTrue(result);
176
177
178 String msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12918");
179 int numberOfListeners = ZkTestHelper.numberOfListeners(ZK_ADDR, msgPath);
180
181 Assert.assertEquals(numberOfListeners, 2);
182
183 _gSetupTool.addInstanceToCluster(clusterName, "localhost_12919");
184 _gSetupTool.rebalanceStorageCluster(clusterName, "TestDB0", 2);
185
186 participants[nodeNr - 1] =
187 new MockParticipant(clusterName, "localhost_12919", ZK_ADDR);
188 participants[nodeNr - 1].syncStart();
189
190
191 result =
192 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
193 clusterName));
194 Assert.assertTrue(result);
195
196 msgPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, "localhost_12919");
197 numberOfListeners = ZkTestHelper.numberOfListeners(ZK_ADDR, msgPath);
198
199 Assert.assertEquals(numberOfListeners, 2);
200
201
202
203
204
205
206
207 System.out.println("END " + clusterName + " at "
208 + new Date(System.currentTimeMillis()));
209 }
210
211 boolean checkHandlers(List<CallbackHandler> handlers, String path)
212 {
213
214 for (CallbackHandler handler : handlers)
215 {
216
217 if (handler.getPath().equals(path))
218 {
219 return true;
220 }
221 }
222 return false;
223 }
224
225
226 }