1 package org.apache.helix.examples;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.TreeSet;
27
28 import org.I0Itec.zkclient.IDefaultNameSpace;
29 import org.I0Itec.zkclient.ZkClient;
30 import org.I0Itec.zkclient.ZkServer;
31 import org.apache.helix.HelixAdmin;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.HelixManagerFactory;
34 import org.apache.helix.InstanceType;
35 import org.apache.helix.controller.HelixControllerMain;
36 import org.apache.helix.manager.zk.ZKHelixAdmin;
37 import org.apache.helix.manager.zk.ZNRecordSerializer;
38 import org.apache.helix.model.ExternalView;
39 import org.apache.helix.model.InstanceConfig;
40 import org.apache.helix.model.StateModelDefinition;
41 import org.apache.helix.participant.StateMachineEngine;
42
43 public class Quickstart
44 {
45
46 private static String ZK_ADDRESS = "localhost:2199";
47 private static String CLUSTER_NAME = "HELIX_QUICKSTART";
48 private static int NUM_NODES = 2;
49 private static final String RESOURCE_NAME = "MyResource";
50 private static final int NUM_PARTITIONS = 6;
51 private static final int NUM_REPLICAS = 2;
52
53 private static final String STATE_MODEL_NAME = "MyStateModel";
54
55
56 private static final String SLAVE = "SLAVE";
57 private static final String OFFLINE = "OFFLINE";
58 private static final String MASTER = "MASTER";
59 private static final String DROPPED = "DROPPED";
60
61 private static List<InstanceConfig> INSTANCE_CONFIG_LIST;
62 private static List<MyProcess> PROCESS_LIST;
63 private static HelixAdmin admin;
64 static
65 {
66 INSTANCE_CONFIG_LIST = new ArrayList<InstanceConfig>();
67 PROCESS_LIST = new ArrayList<Quickstart.MyProcess>();
68 for (int i = 0; i < NUM_NODES; i++)
69 {
70 int port = 12000 + i;
71 InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
72 instanceConfig.setHostName("localhost");
73 instanceConfig.setPort("" + port);
74 instanceConfig.setInstanceEnabled(true);
75 INSTANCE_CONFIG_LIST.add(instanceConfig);
76 }
77
78 }
79
80 public static void setup()
81 {
82 admin = new ZKHelixAdmin(ZK_ADDRESS);
83
84 echo("Creating cluster: " + CLUSTER_NAME);
85 admin.addCluster(CLUSTER_NAME, true);
86
87
88 echo("Adding " + NUM_NODES + " participants to the cluster");
89 for (int i = 0; i < NUM_NODES; i++)
90 {
91 admin.addInstance(CLUSTER_NAME, INSTANCE_CONFIG_LIST.get(i));
92 echo("\t Added participant: "
93 + INSTANCE_CONFIG_LIST.get(i).getInstanceName());
94 }
95
96
97 StateModelDefinition myStateModel = defineStateModel();
98 echo("Configuring StateModel: " + "MyStateModel with 1 Master and 1 Slave");
99 admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, myStateModel);
100
101
102 echo("Adding a resource MyResource: " + "with 6 partitions and 2 replicas");
103 admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS,
104 STATE_MODEL_NAME, "AUTO");
105
106
107 admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);
108 }
109
110 private static StateModelDefinition defineStateModel()
111 {
112 StateModelDefinition.Builder builder = new StateModelDefinition.Builder(
113 STATE_MODEL_NAME);
114
115
116 builder.addState(MASTER, 1);
117 builder.addState(SLAVE, 2);
118 builder.addState(OFFLINE);
119 builder.addState(DROPPED);
120
121 builder.initialState(OFFLINE);
122
123
124 builder.addTransition(OFFLINE, SLAVE);
125 builder.addTransition(SLAVE, OFFLINE);
126 builder.addTransition(SLAVE, MASTER);
127 builder.addTransition(MASTER, SLAVE);
128 builder.addTransition(OFFLINE, DROPPED);
129
130
131
132 builder.upperBound(MASTER, 1);
133
134
135 builder.dynamicUpperBound(SLAVE, "R");
136
137 StateModelDefinition statemodelDefinition = builder.build();
138 return statemodelDefinition;
139 }
140
141 public static void startController()
142 {
143
144 echo("Starting Helix Controller");
145 HelixControllerMain.startHelixController(ZK_ADDRESS, CLUSTER_NAME,
146 "localhost_9100", HelixControllerMain.STANDALONE);
147 }
148
149 public static void startNodes() throws Exception
150 {
151 echo("Starting Participants");
152 for (int i = 0; i < NUM_NODES; i++)
153 {
154 MyProcess process = new MyProcess(INSTANCE_CONFIG_LIST.get(i).getId());
155 PROCESS_LIST.add(process);
156 process.start();
157 echo("\t Started Participant: " + INSTANCE_CONFIG_LIST.get(i).getId());
158 }
159 }
160
161 public static void startZookeeper()
162 {
163 echo("STARTING Zookeeper at " + ZK_ADDRESS);
164 IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
165 {
166 @Override
167 public void createDefaultNameSpace(ZkClient zkClient)
168 {
169 }
170 };
171 new File("/tmp/helix-quickstart").mkdirs();
172
173 ZkServer server = new ZkServer("/tmp/helix-quickstart/dataDir",
174 "/tmp/helix-quickstart/logDir", defaultNameSpace, 2199);
175 server.start();
176 }
177
178 public static void echo(Object obj)
179 {
180 System.out.println(obj);
181 }
182
183 public static void main(String[] args) throws Exception
184 {
185 startZookeeper();
186 setup();
187 startNodes();
188 startController();
189 Thread.sleep(5000);
190 printState("After starting 2 nodes");
191 addNode();
192 Thread.sleep(5000);
193 printState("After adding a third node");
194 stopNode();
195 Thread.sleep(5000);
196 printState("After the 3rd node stops/crashes");
197 Thread.currentThread().join();
198 System.exit(0);
199 }
200
201 private static void addNode() throws Exception
202 {
203
204 NUM_NODES = NUM_NODES + 1;
205 int port = 12000 + NUM_NODES - 1;
206 InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
207 instanceConfig.setHostName("localhost");
208 instanceConfig.setPort("" + port);
209 instanceConfig.setInstanceEnabled(true);
210 echo("ADDING NEW NODE :" + instanceConfig.getInstanceName()+ ". Partitions will move from old nodes to the new node.");
211 admin.addInstance(CLUSTER_NAME, instanceConfig);
212 INSTANCE_CONFIG_LIST.add(instanceConfig);
213 MyProcess process = new MyProcess(instanceConfig.getInstanceName());
214 PROCESS_LIST.add(process);
215 admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, 3);
216 process.start();
217 }
218
219 private static void stopNode()
220 {
221 int nodeId = NUM_NODES - 1;
222 echo("STOPPING " + INSTANCE_CONFIG_LIST.get(nodeId).getInstanceName()+". Mastership will be transferred to the remaining nodes");
223 PROCESS_LIST.get(nodeId).stop();
224 }
225
226 private static void printState(String msg)
227 {
228 System.out.println("CLUSTER STATE: "+ msg);
229 ExternalView resourceExternalView = admin.getResourceExternalView(
230 CLUSTER_NAME, RESOURCE_NAME);
231 TreeSet<String> sortedSet = new TreeSet<String>(
232 resourceExternalView.getPartitionSet());
233 StringBuilder sb = new StringBuilder("\t\t");
234 for (int i = 0; i < NUM_NODES; i++)
235 {
236 sb.append(INSTANCE_CONFIG_LIST.get(i).getInstanceName()).append("\t");
237 }
238 System.out.println(sb);
239 for (String partitionName : sortedSet)
240 {
241 sb.delete(0, sb.length() - 1);
242 sb.append(partitionName).append("\t");
243 for (int i = 0; i < NUM_NODES; i++)
244 {
245 Map<String, String> stateMap = resourceExternalView
246 .getStateMap(partitionName);
247 if (stateMap != null
248 && stateMap.containsKey(INSTANCE_CONFIG_LIST.get(i)
249 .getInstanceName()))
250 {
251 sb.append(
252 stateMap.get(INSTANCE_CONFIG_LIST.get(i).getInstanceName())
253 .charAt(0)).append("\t\t");
254 } else
255 {
256 sb.append("-").append("\t\t");
257 }
258 }
259 System.out.println(sb);
260 }
261 System.out.println("###################################################################");
262 }
263
264 static final class MyProcess
265 {
266 private final String instanceName;
267 private HelixManager manager;
268
269 public MyProcess(String instanceName)
270 {
271 this.instanceName = instanceName;
272 }
273
274 public void start() throws Exception
275 {
276 manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
277 instanceName, InstanceType.PARTICIPANT, ZK_ADDRESS);
278
279 MasterSlaveStateModelFactory stateModelFactory = new MasterSlaveStateModelFactory(
280 instanceName);
281
282 StateMachineEngine stateMach = manager.getStateMachineEngine();
283 stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
284 manager.connect();
285 }
286
287 public void stop()
288 {
289 manager.disconnect();
290 }
291 }
292
293 }