1 package org.apache.helix.controller;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 import java.util.Arrays;
37
38 import org.I0Itec.zkclient.exception.ZkInterruptedException;
39 import org.apache.commons.cli.CommandLine;
40 import org.apache.commons.cli.CommandLineParser;
41 import org.apache.commons.cli.GnuParser;
42 import org.apache.commons.cli.HelpFormatter;
43 import org.apache.commons.cli.Option;
44 import org.apache.commons.cli.OptionBuilder;
45 import org.apache.commons.cli.Options;
46 import org.apache.commons.cli.ParseException;
47 import org.apache.helix.HelixManager;
48 import org.apache.helix.HelixManagerFactory;
49 import org.apache.helix.InstanceType;
50 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
51 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
52 import org.apache.helix.participant.StateMachineEngine;
53 import org.apache.log4j.Logger;
54
55
56 public class HelixControllerMain
57 {
58 public static final String zkServerAddress = "zkSvr";
59 public static final String cluster = "cluster";
60 public static final String help = "help";
61 public static final String mode = "mode";
62 public static final String propertyTransferServicePort = "propertyTransferPort";
63 public static final String name = "controllerName";
64 public static final String STANDALONE = "STANDALONE";
65 public static final String DISTRIBUTED = "DISTRIBUTED";
66 private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
67
68
69 @SuppressWarnings("static-access")
70 synchronized private static Options constructCommandLineOptions()
71 {
72 Option helpOption = OptionBuilder.withLongOpt(help)
73 .withDescription("Prints command-line options info").create();
74
75 Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
76 .withDescription("Provide zookeeper address").create();
77 zkServerOption.setArgs(1);
78 zkServerOption.setRequired(true);
79 zkServerOption.setArgName("ZookeeperServerAddress(Required)");
80
81 Option clusterOption = OptionBuilder.withLongOpt(cluster)
82 .withDescription("Provide cluster name").create();
83 clusterOption.setArgs(1);
84 clusterOption.setRequired(true);
85 clusterOption.setArgName("Cluster name (Required)");
86
87 Option modeOption = OptionBuilder
88 .withLongOpt(mode)
89 .withDescription(
90 "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
91 .create();
92 modeOption.setArgs(1);
93 modeOption.setRequired(false);
94 modeOption.setArgName("Cluster controller mode (Optional)");
95
96 Option controllerNameOption = OptionBuilder.withLongOpt(name)
97 .withDescription("Provide cluster controller name (Optional)").create();
98 controllerNameOption.setArgs(1);
99 controllerNameOption.setRequired(false);
100 controllerNameOption.setArgName("Cluster controller name (Optional)");
101
102 Option portOption = OptionBuilder
103 .withLongOpt(propertyTransferServicePort)
104 .withDescription(
105 "Webservice port for ZkProperty controller transfer")
106 .create();
107 portOption.setArgs(1);
108 portOption.setRequired(false);
109 portOption.setArgName("Cluster controller property transfer port (Optional)");
110
111 Options options = new Options();
112 options.addOption(helpOption);
113 options.addOption(zkServerOption);
114 options.addOption(clusterOption);
115 options.addOption(modeOption);
116 options.addOption(portOption);
117 options.addOption(controllerNameOption);
118
119 return options;
120 }
121
122 public static void printUsage(Options cliOptions)
123 {
124 HelpFormatter helpFormatter = new HelpFormatter();
125 helpFormatter.setWidth(1000);
126 helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
127 }
128
129 public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
130 {
131 CommandLineParser cliParser = new GnuParser();
132 Options cliOptions = constructCommandLineOptions();
133
134 try
135 {
136 return cliParser.parse(cliOptions, cliArgs);
137 } catch (ParseException pe)
138 {
139 logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
140 printUsage(cliOptions);
141 System.exit(1);
142 }
143 return null;
144 }
145
146 public static void addListenersToController(HelixManager manager,
147 GenericHelixController controller)
148 {
149 try
150 {
151 manager.addConfigChangeListener(controller);
152 manager.addLiveInstanceChangeListener(controller);
153 manager.addIdealStateChangeListener(controller);
154
155
156 manager.addControllerListener(controller);
157 } catch (ZkInterruptedException e)
158 {
159 logger
160 .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
161 + e);
162 } catch (Exception e)
163 {
164 logger.error("Error when creating HelixManagerContollerMonitor", e);
165 }
166 }
167
168 public static HelixManager startHelixController(final String zkConnectString,
169 final String clusterName, final String controllerName, final String controllerMode)
170 {
171 HelixManager manager = null;
172 try
173 {
174 if (controllerMode.equalsIgnoreCase(STANDALONE))
175 {
176 manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
177 InstanceType.CONTROLLER, zkConnectString);
178 manager.connect();
179 } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
180 {
181 manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
182 InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
183
184 DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
185 zkConnectString);
186
187
188
189 StateMachineEngine stateMach = manager.getStateMachineEngine();
190 stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
191
192
193 manager.connect();
194 } else
195 {
196 logger.error("cluster controller mode:" + controllerMode + " NOT supported");
197
198
199
200 }
201 } catch (Exception e)
202 {
203 logger.error("Exception while starting controller",e);
204 }
205
206 return manager;
207 }
208
209 public static void main(String[] args) throws Exception
210 {
211
212
213
214
215
216
217
218
219 CommandLine cmd = processCommandLineArgs(args);
220 String zkConnectString = cmd.getOptionValue(zkServerAddress);
221 String clusterName = cmd.getOptionValue(cluster);
222 String controllerMode = STANDALONE;
223 String controllerName = null;
224 int propertyTransServicePort = -1;
225
226 if (cmd.hasOption(mode))
227 {
228 controllerMode = cmd.getOptionValue(mode);
229 }
230
231 if(cmd.hasOption(propertyTransferServicePort))
232 {
233 propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
234 }
235 if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
236 {
237 throw new IllegalArgumentException(
238 "A unique cluster controller name is required in DISTRIBUTED mode");
239 }
240
241 controllerName = cmd.getOptionValue(name);
242
243
244 logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
245 + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
246
247 if (propertyTransServicePort > 0)
248 {
249 ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
250 }
251
252 HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
253 controllerMode);
254 try
255 {
256 Thread.currentThread().join();
257 }
258 catch (InterruptedException e)
259 {
260 logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
261 + " interrupted");
262 }
263 finally
264 {
265 manager.disconnect();
266 ZKPropertyTransferServer.getInstance().shutdown();
267 }
268
269 }
270 }