1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.Map;
26 import java.util.Map.Entry;
27
28 import org.apache.helix.TestHelper;
29 import org.apache.helix.TestHelper.StartCMResult;
30 import org.apache.helix.controller.HelixControllerMain;
31 import org.apache.helix.manager.zk.ZNRecordSerializer;
32 import org.apache.helix.manager.zk.ZkClient;
33 import org.apache.helix.tools.ClusterSetup;
34 import org.apache.helix.tools.ClusterStateVerifier;
35 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
37 import org.apache.log4j.Logger;
38 import org.testng.Assert;
39 import org.testng.annotations.AfterClass;
40 import org.testng.annotations.BeforeClass;
41
42
43
44
45
46
47
48
49 public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase
50 {
51 private static Logger LOG =
52 Logger.getLogger(ZkStandAloneCMTestBase.class);
53
54 protected static final int NODE_NR = 5;
55 protected static final int START_PORT = 12918;
56 protected static final String STATE_MODEL = "MasterSlave";
57 protected static final String TEST_DB = "TestDB";
58 protected static final int _PARTITIONS = 20;
59
60 protected ClusterSetup _setupTool = null;
61 protected final String CLASS_NAME = getShortClassName();
62 protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_"
63 + CLASS_NAME;
64
65 protected Map<String, StartCMResult> _startCMResultMap =
66 new HashMap<String, StartCMResult>();
67 protected ZkClient _zkClient;
68
69 int _replica = 3;
70
71 @BeforeClass
72 public void beforeClass() throws Exception
73 {
74
75 System.out.println("START " + CLASS_NAME + " at "
76 + new Date(System.currentTimeMillis()));
77
78 _zkClient = new ZkClient(ZK_ADDR);
79 _zkClient.setZkSerializer(new ZNRecordSerializer());
80 String namespace = "/" + CLUSTER_NAME;
81 if (_zkClient.exists(namespace))
82 {
83 _zkClient.deleteRecursive(namespace);
84 }
85 _setupTool = new ClusterSetup(ZK_ADDR);
86
87
88 _setupTool.addCluster(CLUSTER_NAME, true);
89 _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL);
90 for (int i = 0; i < NODE_NR; i++)
91 {
92 String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
93 _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
94 }
95 _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
96
97
98 for (int i = 0; i < NODE_NR; i++)
99 {
100 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
101 if (_startCMResultMap.get(instanceName) != null)
102 {
103 LOG.error("fail to start particpant:" + instanceName
104 + "(participant with same name already exists)");
105 }
106 else
107 {
108 StartCMResult result =
109 TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
110 _startCMResultMap.put(instanceName, result);
111 }
112 }
113
114
115 String controllerName = CONTROLLER_PREFIX + "_0";
116 StartCMResult startResult =
117 TestHelper.startController(CLUSTER_NAME,
118 controllerName,
119 ZK_ADDR,
120 HelixControllerMain.STANDALONE);
121 _startCMResultMap.put(controllerName, startResult);
122
123 boolean result =
124 ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
125 CLUSTER_NAME));
126
127 result =
128 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
129 CLUSTER_NAME));
130 Assert.assertTrue(result);
131 }
132
133 @AfterClass
134 public void afterClass() throws Exception
135 {
136
137
138
139
140 StartCMResult result;
141 Iterator<Entry<String, StartCMResult>> it = _startCMResultMap.entrySet().iterator();
142 while (it.hasNext())
143 {
144 String instanceName = it.next().getKey();
145 if (instanceName.startsWith(CONTROLLER_PREFIX))
146 {
147 result = _startCMResultMap.get(instanceName);
148 result._manager.disconnect();
149 result._thread.interrupt();
150 it.remove();
151 }
152 }
153
154 Thread.sleep(100);
155 it = _startCMResultMap.entrySet().iterator();
156 while (it.hasNext())
157 {
158 String instanceName = it.next().getKey();
159 result = _startCMResultMap.get(instanceName);
160 result._manager.disconnect();
161 result._thread.interrupt();
162 it.remove();
163 }
164
165 _zkClient.close();
166
167 System.out.println("END " + CLASS_NAME + " at "
168 + new Date(System.currentTimeMillis()));
169 }
170 }