1 package org.apache.helix.mock.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 import org.apache.commons.cli.CommandLine;
24 import org.apache.commons.cli.CommandLineParser;
25 import org.apache.commons.cli.GnuParser;
26 import org.apache.commons.cli.HelpFormatter;
27 import org.apache.commons.cli.Option;
28 import org.apache.commons.cli.OptionBuilder;
29 import org.apache.commons.cli.OptionGroup;
30 import org.apache.commons.cli.Options;
31 import org.apache.commons.cli.ParseException;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.HelixManagerFactory;
34 import org.apache.helix.InstanceType;
35 import org.apache.helix.NotificationContext;
36 import org.apache.helix.model.Message;
37 import org.apache.helix.participant.StateMachineEngine;
38 import org.apache.helix.participant.statemachine.StateModel;
39 import org.apache.helix.participant.statemachine.StateModelFactory;
40 import org.apache.log4j.Logger;
41
42
43 public class DummyProcess
44 {
45 private static final Logger logger = Logger.getLogger(DummyProcess.class);
46 public static final String zkServer = "zkSvr";
47 public static final String cluster = "cluster";
48 public static final String hostAddress = "host";
49 public static final String hostPort = "port";
50 public static final String relayCluster = "relayCluster";
51 public static final String help = "help";
52 public static final String transDelay = "transDelay";
53 public static final String helixManagerType = "helixManagerType";
54
55
56 private final String _zkConnectString;
57 private final String _clusterName;
58 private final String _instanceName;
59 private DummyStateModelFactory stateModelFactory;
60
61
62
63 private int _transDelayInMs = 0;
64 private final String _clusterMangerType;
65
66 public DummyProcess(String zkConnectString,
67 String clusterName,
68 String instanceName,
69 String clusterMangerType,
70 int delay)
71 {
72 _zkConnectString = zkConnectString;
73 _clusterName = clusterName;
74 _instanceName = instanceName;
75 _clusterMangerType = clusterMangerType;
76 _transDelayInMs = delay > 0 ? delay : 0;
77 }
78
79 static void sleep(long transDelay)
80 {
81 try
82 {
83 if (transDelay > 0)
84 {
85 Thread.sleep(transDelay);
86 }
87 }
88 catch (InterruptedException e)
89 {
90
91 e.printStackTrace();
92 }
93 }
94
95 public HelixManager start() throws Exception
96 {
97 HelixManager manager = null;
98
99 if (_clusterMangerType.equalsIgnoreCase("zk"))
100 {
101 manager = HelixManagerFactory.getZKHelixManager(_clusterName,
102 _instanceName,
103 InstanceType.PARTICIPANT,
104 _zkConnectString);
105 }
106 else
107 {
108 throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
109 }
110
111 stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
112 DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
113 DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
114
115 StateMachineEngine stateMach = manager.getStateMachineEngine();
116 stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
117 stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
118 stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
119
120 manager.connect();
121
122 return manager;
123 }
124
125 public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel>
126 {
127 int _delay;
128
129 public DummyStateModelFactory(int delay)
130 {
131 _delay = delay;
132 }
133
134 @Override
135 public DummyStateModel createNewStateModel(String stateUnitKey)
136 {
137 DummyStateModel model = new DummyStateModel();
138 model.setDelay(_delay);
139 return model;
140 }
141 }
142
143 public static class DummyLeaderStandbyStateModelFactory extends StateModelFactory<DummyLeaderStandbyStateModel>
144 {
145 int _delay;
146
147 public DummyLeaderStandbyStateModelFactory(int delay)
148 {
149 _delay = delay;
150 }
151
152 @Override
153 public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey)
154 {
155 DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
156 model.setDelay(_delay);
157 return model;
158 }
159 }
160
161 public static class DummyOnlineOfflineStateModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel>
162 {
163 int _delay;
164
165 public DummyOnlineOfflineStateModelFactory(int delay)
166 {
167 _delay = delay;
168 }
169
170 @Override
171 public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey)
172 {
173 DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
174 model.setDelay(_delay);
175 return model;
176 }
177 }
178 public static class DummyStateModel extends StateModel
179 {
180 int _transDelay = 0;
181
182 public void setDelay(int delay)
183 {
184 _transDelay = delay > 0 ? delay : 0;
185 }
186
187 public void onBecomeSlaveFromOffline(Message message,
188 NotificationContext context)
189 {
190 String db = message.getPartitionName();
191 String instanceName = context.getManager().getInstanceName();
192 DummyProcess.sleep(_transDelay);
193
194 logger.info("DummyStateModel.onBecomeSlaveFromOffline(), instance:" + instanceName
195 + ", db:" + db);
196 }
197
198 public void onBecomeSlaveFromMaster(Message message,
199 NotificationContext context)
200 {
201 DummyProcess.sleep(_transDelay);
202
203 logger.info("DummyStateModel.onBecomeSlaveFromMaster()");
204
205 }
206
207 public void onBecomeMasterFromSlave(Message message,
208 NotificationContext context)
209 {
210 DummyProcess.sleep(_transDelay);
211
212 logger.info("DummyStateModel.onBecomeMasterFromSlave()");
213
214 }
215
216 public void onBecomeOfflineFromSlave(Message message,
217 NotificationContext context)
218 {
219 DummyProcess.sleep(_transDelay);
220
221 logger.info("DummyStateModel.onBecomeOfflineFromSlave()");
222
223 }
224
225 public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
226 {
227 DummyProcess.sleep(_transDelay);
228
229 logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
230
231 }
232 }
233
234
235 public static class DummyOnlineOfflineStateModel extends StateModel
236 {
237 int _transDelay = 0;
238
239 public void setDelay(int delay)
240 {
241 _transDelay = delay > 0 ? delay : 0;
242 }
243
244 public void onBecomeOnlineFromOffline(Message message,
245 NotificationContext context)
246 {
247 String db = message.getPartitionName();
248 String instanceName = context.getManager().getInstanceName();
249 DummyProcess.sleep(_transDelay);
250
251 logger.info("DummyStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName
252 + ", db:" + db);
253 }
254
255 public void onBecomeOfflineFromOnline(Message message,
256 NotificationContext context)
257 {
258 DummyProcess.sleep(_transDelay);
259
260 logger.info("DummyStateModel.onBecomeOfflineFromOnline()");
261
262 }
263 public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
264 {
265 DummyProcess.sleep(_transDelay);
266
267 logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
268
269 }
270 }
271
272 public static class DummyLeaderStandbyStateModel extends StateModel
273 {
274 int _transDelay = 0;
275
276 public void setDelay(int delay)
277 {
278 _transDelay = delay > 0 ? delay : 0;
279 }
280
281 public void onBecomeLeaderFromStandby(Message message,
282 NotificationContext context)
283 {
284 String db = message.getPartitionName();
285 String instanceName = context.getManager().getInstanceName();
286 DummyProcess.sleep(_transDelay);
287 logger.info("DummyLeaderStandbyStateModel.onBecomeLeaderFromStandby(), instance:" + instanceName
288 + ", db:" + db);
289 }
290
291 public void onBecomeStandbyFromLeader(Message message,
292 NotificationContext context)
293 {
294 DummyProcess.sleep(_transDelay);
295
296 logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromLeader()");
297
298 }
299 public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
300 {
301 DummyProcess.sleep(_transDelay);
302
303 logger.info("DummyLeaderStandbyStateModel.onBecomeDroppedFromOffline()");
304
305 }
306
307 public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
308 {
309 DummyProcess.sleep(_transDelay);
310
311 logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromOffline()");
312
313 }
314
315 public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
316 {
317 DummyProcess.sleep(_transDelay);
318
319 logger.info("DummyLeaderStandbyStateModel.onBecomeOfflineFromStandby()");
320
321 }
322 }
323
324
325 @SuppressWarnings("static-access")
326 synchronized private static Options constructCommandLineOptions()
327 {
328 Option helpOption = OptionBuilder.withLongOpt(help)
329 .withDescription("Prints command-line options info").create();
330
331 Option clusterOption = OptionBuilder.withLongOpt(cluster)
332 .withDescription("Provide cluster name").create();
333 clusterOption.setArgs(1);
334 clusterOption.setRequired(true);
335 clusterOption.setArgName("Cluster name (Required)");
336
337 Option hostOption = OptionBuilder.withLongOpt(hostAddress)
338 .withDescription("Provide host name").create();
339 hostOption.setArgs(1);
340 hostOption.setRequired(true);
341 hostOption.setArgName("Host name (Required)");
342
343 Option portOption = OptionBuilder.withLongOpt(hostPort)
344 .withDescription("Provide host port").create();
345 portOption.setArgs(1);
346 portOption.setRequired(true);
347 portOption.setArgName("Host port (Required)");
348
349 Option cmTypeOption = OptionBuilder.withLongOpt(helixManagerType)
350 .withDescription("Provide cluster manager type (e.g. 'zk', 'static-file', or 'dynamic-file'").create();
351 cmTypeOption.setArgs(1);
352 cmTypeOption.setRequired(true);
353 cmTypeOption.setArgName("Clsuter manager type (e.g. 'zk', 'static-file', or 'dynamic-file') (Required)");
354
355 Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
356 .withDescription("Provide zookeeper address").create();
357 zkServerOption.setArgs(1);
358 zkServerOption.setRequired(true);
359 zkServerOption.setArgName("ZookeeperServerAddress(Required for zk-based cluster manager)");
360
361
362
363
364
365
366
367
368 Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
369 .withDescription("Provide state trans delay").create();
370 transDelayOption.setArgs(1);
371 transDelayOption.setRequired(false);
372 transDelayOption.setArgName("Delay time in state transition, in MS");
373
374 OptionGroup optionGroup = new OptionGroup();
375 optionGroup.addOption(zkServerOption);
376
377 Options options = new Options();
378 options.addOption(helpOption);
379 options.addOption(clusterOption);
380 options.addOption(hostOption);
381 options.addOption(portOption);
382 options.addOption(transDelayOption);
383 options.addOption(cmTypeOption);
384
385 options.addOptionGroup(optionGroup);
386
387 return options;
388 }
389
390 public static void printUsage(Options cliOptions)
391 {
392 HelpFormatter helpFormatter = new HelpFormatter();
393 helpFormatter.printHelp("java " + DummyProcess.class.getName(), cliOptions);
394 }
395
396 public static CommandLine processCommandLineArgs(String[] cliArgs)
397 throws Exception
398 {
399 CommandLineParser cliParser = new GnuParser();
400 Options cliOptions = constructCommandLineOptions();
401
402
403 try
404 {
405 return cliParser.parse(cliOptions, cliArgs);
406 } catch (ParseException pe)
407 {
408 System.err
409 .println("CommandLineClient: failed to parse command-line options: "
410 + pe.toString());
411 printUsage(cliOptions);
412 System.exit(1);
413 }
414 return null;
415 }
416
417 public static void main(String[] args) throws Exception
418 {
419 String cmType = "zk";
420 String zkConnectString = "localhost:2181";
421 String clusterName = "testCluster";
422 String instanceName = "localhost_8900";
423 String cvFileStr = null;
424
425 int delay = 0;
426
427 if (args.length > 0)
428 {
429 CommandLine cmd = processCommandLineArgs(args);
430 zkConnectString = cmd.getOptionValue(zkServer);
431 clusterName = cmd.getOptionValue(cluster);
432
433 String host = cmd.getOptionValue(hostAddress);
434 String portString = cmd.getOptionValue(hostPort);
435 int port = Integer.parseInt(portString);
436 instanceName = host + "_" + port;
437 cmType = cmd.getOptionValue(helixManagerType);
438 if (cmd.hasOption(transDelay))
439 {
440 try
441 {
442 delay = Integer.parseInt(cmd.getOptionValue(transDelay));
443 if (delay < 0)
444 {
445 throw new Exception("delay must be positive");
446 }
447 } catch (Exception e)
448 {
449 e.printStackTrace();
450 delay = 0;
451 }
452 }
453 }
454
455 logger.info("Dummy process started, instanceName:" + instanceName);
456
457 DummyProcess process = new DummyProcess(zkConnectString,
458 clusterName,
459 instanceName,
460 cmType,
461 delay);
462 HelixManager manager = process.start();
463
464 try
465 {
466 Thread.currentThread().join();
467 }
468 catch (InterruptedException e)
469 {
470
471 logger.info("participant:" + instanceName + ", " +
472 Thread.currentThread().getName() + " interrupted");
473
474
475
476
477 }
478 finally
479 {
480 if (manager != null)
481 {
482 manager.disconnect();
483 }
484 }
485
486 }
487 }