1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23
24 import org.apache.helix.TestHelper;
25 import org.apache.helix.ZNRecord;
26 import org.apache.helix.PropertyKey.Builder;
27 import org.apache.helix.controller.HelixControllerMain;
28 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
29 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30 import org.apache.helix.mock.controller.ClusterController;
31 import org.apache.helix.mock.participant.MockParticipant;
32 import org.apache.helix.model.LiveInstance;
33 import org.apache.helix.tools.ClusterStateVerifier;
34 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
35 import org.testng.Assert;
36 import org.testng.annotations.Test;
37
38
39 public class TestDistributedClusterController extends ZkIntegrationTestBase
40 {
41
42 @Test
43 public void testDistributedClusterController() throws Exception
44 {
45 String className = TestHelper.getTestClassName();
46 String methodName = TestHelper.getTestMethodName();
47 String clusterNamePrefix = className + "_" + methodName;
48 final int n = 5;
49 final int clusterNb = 10;
50
51 System.out.println("START " + clusterNamePrefix + " at "
52 + new Date(System.currentTimeMillis()));
53
54
55 for (int i = 0; i < clusterNb; i++)
56 {
57 String clusterName = clusterNamePrefix + "0_" + i;
58 String participantName = "localhost" + i;
59 String resourceName = "TestDB" + i;
60 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
61 participantName,
62 resourceName,
63 1,
64 8,
65 n,
66 3,
67 "MasterSlave",
68 true);
69 }
70
71
72 final String controllerClusterName = "CONTROLLER_" + clusterNamePrefix;
73 TestHelper.setupCluster("CONTROLLER_" + clusterNamePrefix, ZK_ADDR, 0,
74
75 "controller",
76 clusterNamePrefix,
77 1,
78 clusterNb,
79 n,
80 3,
81 "LeaderStandby",
82 true);
83
84
85 ClusterController[] controllers = new ClusterController[n];
86 for (int i = 0; i < n; i++)
87 {
88 controllers[i] =
89 new ClusterController(controllerClusterName,
90 "controller_" + i,
91 ZK_ADDR,
92 HelixControllerMain.DISTRIBUTED.toString());
93 controllers[i].syncStart();
94 }
95
96 boolean result =
97 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
98 controllerClusterName),
99 30000);
100 Assert.assertTrue(result, "Controller cluster NOT in ideal state");
101
102
103 MockParticipant[] participants = new MockParticipant[n];
104 final String firstClusterName = clusterNamePrefix + "0_0";
105 for (int i = 0; i < n; i++)
106 {
107 String instanceName = "localhost0_" + (12918 + i);
108 participants[i] =
109 new MockParticipant(firstClusterName, instanceName, ZK_ADDR, null);
110 participants[i].syncStart();
111 }
112
113 result =
114 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
115 firstClusterName));
116 Assert.assertTrue(result, "first cluster NOT in ideal state");
117
118
119
120 ZkBaseDataAccessor<ZNRecord> baseAccessor =
121 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
122 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(controllerClusterName, baseAccessor);
123 Builder keyBuilder = accessor.keyBuilder();
124 LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
125 String leaderName = leader.getId();
126 int j = Integer.parseInt(leaderName.substring(leaderName.lastIndexOf('_') + 1));
127 controllers[j].syncStop();
128
129
130
131 MockParticipant[] participants2 = new MockParticipant[n];
132 final String secondClusterName = clusterNamePrefix + "0_1";
133 for (int i = 0; i < n; i++)
134 {
135 String instanceName = "localhost1_" + (12918 + i);
136 participants2[i] =
137 new MockParticipant(secondClusterName, instanceName, ZK_ADDR, null);
138 participants2[i].syncStart();
139 }
140
141 result =
142 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
143 secondClusterName));
144 Assert.assertTrue(result, "second cluster NOT in ideal state");
145
146
147
148 System.out.println("Cleaning up...");
149 Thread.sleep(1000);
150 for (int i = 0; i < 5; i++)
151 {
152 result =
153 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
154 controllerClusterName));
155 controllers[i].syncStop();
156 }
157
158 for (int i = 0; i < 5; i++)
159 {
160 participants[i].syncStop();
161 }
162
163 System.out.println("END " + clusterNamePrefix + " at "
164 + new Date(System.currentTimeMillis()));
165
166 }
167 }