1 package org.apache.helix.examples;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23
24 import org.apache.commons.cli.CommandLine;
25 import org.apache.commons.cli.CommandLineParser;
26 import org.apache.commons.cli.GnuParser;
27 import org.apache.commons.cli.HelpFormatter;
28 import org.apache.commons.cli.Option;
29 import org.apache.commons.cli.OptionBuilder;
30 import org.apache.commons.cli.OptionGroup;
31 import org.apache.commons.cli.Options;
32 import org.apache.commons.cli.ParseException;
33 import org.apache.helix.HelixManager;
34 import org.apache.helix.HelixManagerFactory;
35 import org.apache.helix.InstanceType;
36 import org.apache.helix.model.Message.MessageType;
37 import org.apache.helix.participant.StateMachineEngine;
38 import org.apache.helix.participant.statemachine.StateModel;
39 import org.apache.helix.participant.statemachine.StateModelFactory;
40 import org.apache.helix.tools.ClusterStateVerifier;
41
42 public class ExampleProcess
43 {
44
45 public static final String zkServer = "zkSvr";
46 public static final String cluster = "cluster";
47 public static final String hostAddress = "host";
48 public static final String hostPort = "port";
49 public static final String relayCluster = "relayCluster";
50 public static final String help = "help";
51 public static final String configFile = "configFile";
52 public static final String stateModel = "stateModelType";
53 public static final String transDelay = "transDelay";
54
55 private final String zkConnectString;
56 private final String clusterName;
57 private final String instanceName;
58 private final String stateModelType;
59 private HelixManager manager;
60
61
62
63 private StateModelFactory<StateModel> stateModelFactory;
64 private final int delay;
65
66 public ExampleProcess(String zkConnectString, String clusterName,
67 String instanceName, String file, String stateModel, int delay)
68 {
69 this.zkConnectString = zkConnectString;
70 this.clusterName = clusterName;
71 this.instanceName = instanceName;
72 stateModelType = stateModel;
73 this.delay = delay;
74 }
75
76 public void start() throws Exception
77 {
78 manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
79 InstanceType.PARTICIPANT, zkConnectString);
80
81 if ("MasterSlave".equalsIgnoreCase(stateModelType))
82 {
83 stateModelFactory = new MasterSlaveStateModelFactory(delay);
84 } else if ("OnlineOffline".equalsIgnoreCase(stateModelType))
85 {
86 stateModelFactory = new OnlineOfflineStateModelFactory(delay);
87 } else if ("LeaderStandby".equalsIgnoreCase(stateModelType))
88 {
89 stateModelFactory = new LeaderStandbyStateModelFactory(delay);
90 }
91
92
93
94
95 StateMachineEngine stateMach = manager.getStateMachineEngine();
96 stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
97 manager.connect();
98 manager.getMessagingService().registerMessageHandlerFactory(
99 MessageType.STATE_TRANSITION.toString(), stateMach);
100 }
101
102 public void stop()
103 {
104 manager.disconnect();
105 }
106
107 @SuppressWarnings("static-access")
108 private static Options constructCommandLineOptions()
109 {
110 Option helpOption = OptionBuilder.withLongOpt(help)
111 .withDescription("Prints command-line options info").create();
112
113 Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
114 .withDescription("Provide zookeeper address").create();
115 zkServerOption.setArgs(1);
116 zkServerOption.setRequired(true);
117 zkServerOption.setArgName("ZookeeperServerAddress(Required)");
118
119 Option clusterOption = OptionBuilder.withLongOpt(cluster)
120 .withDescription("Provide cluster name").create();
121 clusterOption.setArgs(1);
122 clusterOption.setRequired(true);
123 clusterOption.setArgName("Cluster name (Required)");
124
125 Option hostOption = OptionBuilder.withLongOpt(hostAddress)
126 .withDescription("Provide host name").create();
127 hostOption.setArgs(1);
128 hostOption.setRequired(true);
129 hostOption.setArgName("Host name (Required)");
130
131 Option portOption = OptionBuilder.withLongOpt(hostPort)
132 .withDescription("Provide host port").create();
133 portOption.setArgs(1);
134 portOption.setRequired(true);
135 portOption.setArgName("Host port (Required)");
136
137 Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
138 .withDescription("StateModel Type").create();
139 stateModelOption.setArgs(1);
140 stateModelOption.setRequired(true);
141 stateModelOption.setArgName("StateModel Type (Required)");
142
143
144 Option fileOption = OptionBuilder.withLongOpt(configFile)
145 .withDescription("Provide file to read states/messages").create();
146 fileOption.setArgs(1);
147 fileOption.setRequired(true);
148 fileOption.setArgName("File to read states/messages (Optional)");
149
150 Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
151 .withDescription("Provide state trans delay").create();
152 transDelayOption.setArgs(1);
153 transDelayOption.setRequired(false);
154 transDelayOption.setArgName("Delay time in state transition, in MS");
155
156 OptionGroup optionGroup = new OptionGroup();
157 optionGroup.addOption(zkServerOption);
158 optionGroup.addOption(fileOption);
159
160 Options options = new Options();
161 options.addOption(helpOption);
162
163 options.addOption(clusterOption);
164 options.addOption(hostOption);
165 options.addOption(portOption);
166 options.addOption(stateModelOption);
167 options.addOption(transDelayOption);
168
169 options.addOptionGroup(optionGroup);
170
171 return options;
172 }
173
174 public static void printUsage(Options cliOptions)
175 {
176 HelpFormatter helpFormatter = new HelpFormatter();
177 helpFormatter.printHelp("java " + ExampleProcess.class.getName(),
178 cliOptions);
179 }
180
181 public static CommandLine processCommandLineArgs(String[] cliArgs)
182 throws Exception
183 {
184 CommandLineParser cliParser = new GnuParser();
185 Options cliOptions = constructCommandLineOptions();
186 try
187 {
188 return cliParser.parse(cliOptions, cliArgs);
189 } catch (ParseException pe)
190 {
191 System.err
192 .println("CommandLineClient: failed to parse command-line options: "
193 + pe.toString());
194 printUsage(cliOptions);
195 System.exit(1);
196 }
197 return null;
198 }
199
200 public static void main(String[] args) throws Exception
201 {
202 String zkConnectString = "localhost:2181";
203 String clusterName = "storage-integration-cluster";
204 String instanceName = "localhost_8905";
205 String file = null;
206 String stateModelValue = "MasterSlave";
207 int delay = 0;
208 boolean skipZeroArgs = true;
209 if (!skipZeroArgs || args.length > 0)
210 {
211 CommandLine cmd = processCommandLineArgs(args);
212 zkConnectString = cmd.getOptionValue(zkServer);
213 clusterName = cmd.getOptionValue(cluster);
214
215 String host = cmd.getOptionValue(hostAddress);
216 String portString = cmd.getOptionValue(hostPort);
217 int port = Integer.parseInt(portString);
218 instanceName = host + "_" + port;
219
220 file = cmd.getOptionValue(configFile);
221 if (file != null)
222 {
223 File f = new File(file);
224 if (!f.exists())
225 {
226 System.err.println("static config file doesn't exist");
227 System.exit(1);
228 }
229 }
230
231 stateModelValue = cmd.getOptionValue(stateModel);
232 if (cmd.hasOption(transDelay))
233 {
234 try
235 {
236 delay = Integer.parseInt(cmd.getOptionValue(transDelay));
237 if (delay < 0)
238 {
239 throw new Exception("delay must be positive");
240 }
241 } catch (Exception e)
242 {
243 e.printStackTrace();
244 delay = 0;
245 }
246 }
247 }
248
249 System.out.println("Starting Process with ZK:" + zkConnectString);
250
251 ExampleProcess process = new ExampleProcess(zkConnectString, clusterName,
252 instanceName, file, stateModelValue, delay);
253
254 process.start();
255 Thread.currentThread().join();
256 }
257 }