View Javadoc

1   package org.apache.helix.controller;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  /**
23   * start cluster manager controller
24   * cluster manager controller has two modes:
25   * 1) stand-alone mode: in this mode each controller gets a list of clusters
26   *  and competes via leader election to become the controller for any of the clusters.
27   *  if a controller fails to become the leader of a given cluster, it remains as a standby
28   *  and re-does the leader election when the current leader fails
29   *
30   * 2) distributed mode: in this mode each controller first joins as participant into
31   *   a special CONTROLLER_CLUSTER. Leader election happens in this special
32   *   cluster. The one that becomes the leader controls all controllers (including itself
33   *   to become leaders of other clusters.
34   */
35  
36  import java.util.Arrays;
37  
38  import org.I0Itec.zkclient.exception.ZkInterruptedException;
39  import org.apache.commons.cli.CommandLine;
40  import org.apache.commons.cli.CommandLineParser;
41  import org.apache.commons.cli.GnuParser;
42  import org.apache.commons.cli.HelpFormatter;
43  import org.apache.commons.cli.Option;
44  import org.apache.commons.cli.OptionBuilder;
45  import org.apache.commons.cli.Options;
46  import org.apache.commons.cli.ParseException;
47  import org.apache.helix.HelixManager;
48  import org.apache.helix.HelixManagerFactory;
49  import org.apache.helix.InstanceType;
50  import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
51  import org.apache.helix.participant.DistClusterControllerStateModelFactory;
52  import org.apache.helix.participant.StateMachineEngine;
53  import org.apache.log4j.Logger;
54  
55  
56  public class HelixControllerMain
57  {
58    public static final String zkServerAddress = "zkSvr";
59    public static final String cluster = "cluster";
60    public static final String help = "help";
61    public static final String mode = "mode";
62    public static final String propertyTransferServicePort = "propertyTransferPort";
63    public static final String name = "controllerName";
64    public static final String STANDALONE = "STANDALONE";
65    public static final String DISTRIBUTED = "DISTRIBUTED";
66    private static final Logger logger = Logger.getLogger(HelixControllerMain.class);
67  
68    // hack: OptionalBuilder is not thread safe
69    @SuppressWarnings("static-access")
70    synchronized private static Options constructCommandLineOptions()
71    {
72      Option helpOption = OptionBuilder.withLongOpt(help)
73          .withDescription("Prints command-line options info").create();
74  
75      Option zkServerOption = OptionBuilder.withLongOpt(zkServerAddress)
76          .withDescription("Provide zookeeper address").create();
77      zkServerOption.setArgs(1);
78      zkServerOption.setRequired(true);
79      zkServerOption.setArgName("ZookeeperServerAddress(Required)");
80  
81      Option clusterOption = OptionBuilder.withLongOpt(cluster)
82          .withDescription("Provide cluster name").create();
83      clusterOption.setArgs(1);
84      clusterOption.setRequired(true);
85      clusterOption.setArgName("Cluster name (Required)");
86  
87      Option modeOption = OptionBuilder
88          .withLongOpt(mode)
89          .withDescription(
90              "Provide cluster controller mode (Optional): STANDALONE (default) or DISTRIBUTED")
91          .create();
92      modeOption.setArgs(1);
93      modeOption.setRequired(false);
94      modeOption.setArgName("Cluster controller mode (Optional)");
95  
96      Option controllerNameOption = OptionBuilder.withLongOpt(name)
97          .withDescription("Provide cluster controller name (Optional)").create();
98      controllerNameOption.setArgs(1);
99      controllerNameOption.setRequired(false);
100     controllerNameOption.setArgName("Cluster controller name (Optional)");
101     
102     Option portOption = OptionBuilder
103         .withLongOpt(propertyTransferServicePort)
104         .withDescription(
105             "Webservice port for ZkProperty controller transfer")
106         .create();
107     portOption.setArgs(1);
108     portOption.setRequired(false);
109     portOption.setArgName("Cluster controller property transfer port (Optional)");
110     
111     Options options = new Options();
112     options.addOption(helpOption);
113     options.addOption(zkServerOption);
114     options.addOption(clusterOption);
115     options.addOption(modeOption);
116     options.addOption(portOption);
117     options.addOption(controllerNameOption);
118 
119     return options;
120   }
121 
122   public static void printUsage(Options cliOptions)
123   {
124     HelpFormatter helpFormatter = new HelpFormatter();
125     helpFormatter.setWidth(1000);
126     helpFormatter.printHelp("java " + HelixControllerMain.class.getName(), cliOptions);
127   }
128 
129   public static CommandLine processCommandLineArgs(String[] cliArgs) throws Exception
130   {
131     CommandLineParser cliParser = new GnuParser();
132     Options cliOptions = constructCommandLineOptions();
133 
134     try
135     {
136       return cliParser.parse(cliOptions, cliArgs);
137     } catch (ParseException pe)
138     {
139       logger.error("fail to parse command-line options. cliArgs: " + Arrays.toString(cliArgs), pe);
140       printUsage(cliOptions);
141       System.exit(1);
142     }
143     return null;
144   }
145 
146   public static void addListenersToController(HelixManager manager,
147       GenericHelixController controller)
148   {
149     try
150     {
151       manager.addConfigChangeListener(controller);
152       manager.addLiveInstanceChangeListener(controller);
153       manager.addIdealStateChangeListener(controller);
154       // no need for controller to listen on external-view
155       // manager.addExternalViewChangeListener(controller);
156       manager.addControllerListener(controller);
157     } catch (ZkInterruptedException e)
158     {
159       logger
160           .warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
161               + e);
162     } catch (Exception e)
163     {
164       logger.error("Error when creating HelixManagerContollerMonitor", e);
165     }
166   }
167 
168   public static HelixManager startHelixController(final String zkConnectString,
169       final String clusterName, final String controllerName, final String controllerMode)
170   {
171     HelixManager manager = null;
172     try
173     {
174       if (controllerMode.equalsIgnoreCase(STANDALONE))
175       {
176         manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
177             InstanceType.CONTROLLER, zkConnectString);
178         manager.connect();
179       } else if (controllerMode.equalsIgnoreCase(DISTRIBUTED))
180       {
181         manager = HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
182             InstanceType.CONTROLLER_PARTICIPANT, zkConnectString);
183 
184         DistClusterControllerStateModelFactory stateModelFactory = new DistClusterControllerStateModelFactory(
185             zkConnectString);
186 
187         // StateMachineEngine genericStateMachineHandler = new
188         // StateMachineEngine();
189         StateMachineEngine stateMach = manager.getStateMachineEngine();
190         stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
191         // manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
192         // genericStateMachineHandler);
193         manager.connect();
194       } else
195       {
196         logger.error("cluster controller mode:" + controllerMode + " NOT supported");
197         // throw new
198         // IllegalArgumentException("Unsupported cluster controller mode:" +
199         // controllerMode);
200       }
201     } catch (Exception e)
202     {
203       logger.error("Exception while starting controller",e);
204     }
205 
206     return manager;
207   }
208 
209   public static void main(String[] args) throws Exception
210   {
211     // read the config;
212     // check if the this process is the master wait indefinitely
213     // other approach is always process the events but when updating the zk
214     // check if this is master.
215     // This is difficult to get right
216     // get the clusters to manage
217     // for each cluster create a manager
218     // add the respective listeners for each manager
219     CommandLine cmd = processCommandLineArgs(args);
220     String zkConnectString = cmd.getOptionValue(zkServerAddress);
221     String clusterName = cmd.getOptionValue(cluster);
222     String controllerMode = STANDALONE;
223     String controllerName = null;
224     int propertyTransServicePort = -1;
225     
226     if (cmd.hasOption(mode))
227     {
228       controllerMode = cmd.getOptionValue(mode);
229     }
230     
231     if(cmd.hasOption(propertyTransferServicePort))
232     {
233         propertyTransServicePort = Integer.parseInt(cmd.getOptionValue(propertyTransferServicePort));
234     }
235     if (controllerMode.equalsIgnoreCase(DISTRIBUTED) && !cmd.hasOption(name))
236     {
237       throw new IllegalArgumentException(
238           "A unique cluster controller name is required in DISTRIBUTED mode");
239     }
240 
241     controllerName = cmd.getOptionValue(name);
242 
243     // Espresso_driver.py will consume this
244     logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
245         + clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);
246 
247     if (propertyTransServicePort > 0)
248     {
249       ZKPropertyTransferServer.getInstance().init(propertyTransServicePort, zkConnectString);
250     }
251     
252     HelixManager manager = startHelixController(zkConnectString, clusterName, controllerName,
253         controllerMode);
254     try
255     {
256       Thread.currentThread().join();
257     } 
258     catch (InterruptedException e)
259     {
260       logger.info("controller:" + controllerName + ", " + Thread.currentThread().getName()
261           + " interrupted");
262     }
263     finally
264     {
265       manager.disconnect();
266       ZKPropertyTransferServer.getInstance().shutdown();
267     }
268 
269   }
270 }