View Javadoc

1   package org.apache.helix.examples;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  
24  import org.apache.commons.cli.CommandLine;
25  import org.apache.commons.cli.CommandLineParser;
26  import org.apache.commons.cli.GnuParser;
27  import org.apache.commons.cli.HelpFormatter;
28  import org.apache.commons.cli.Option;
29  import org.apache.commons.cli.OptionBuilder;
30  import org.apache.commons.cli.OptionGroup;
31  import org.apache.commons.cli.Options;
32  import org.apache.commons.cli.ParseException;
33  import org.apache.helix.HelixManager;
34  import org.apache.helix.HelixManagerFactory;
35  import org.apache.helix.InstanceType;
36  import org.apache.helix.model.Message.MessageType;
37  import org.apache.helix.participant.StateMachineEngine;
38  import org.apache.helix.participant.statemachine.StateModel;
39  import org.apache.helix.participant.statemachine.StateModelFactory;
40  import org.apache.helix.tools.ClusterStateVerifier;
41  
42  public class ExampleProcess
43  {
44  
45    public static final String zkServer = "zkSvr";
46    public static final String cluster = "cluster";
47    public static final String hostAddress = "host";
48    public static final String hostPort = "port";
49    public static final String relayCluster = "relayCluster";
50    public static final String help = "help";
51    public static final String configFile = "configFile";
52    public static final String stateModel = "stateModelType";
53    public static final String transDelay = "transDelay";
54  
55    private final String zkConnectString;
56    private final String clusterName;
57    private final String instanceName;
58    private final String stateModelType;
59    private HelixManager manager;
60  
61    // private StateMachineEngine genericStateMachineHandler;
62  
63    private StateModelFactory<StateModel> stateModelFactory;
64    private final int delay;
65  
66    public ExampleProcess(String zkConnectString, String clusterName,
67        String instanceName, String file, String stateModel, int delay)
68    {
69      this.zkConnectString = zkConnectString;
70      this.clusterName = clusterName;
71      this.instanceName = instanceName;
72      stateModelType = stateModel;
73      this.delay = delay;
74    }
75  
76    public void start() throws Exception
77    {
78      manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
79          InstanceType.PARTICIPANT, zkConnectString);
80      
81      if ("MasterSlave".equalsIgnoreCase(stateModelType))
82      {
83        stateModelFactory = new MasterSlaveStateModelFactory(delay);
84      } else if ("OnlineOffline".equalsIgnoreCase(stateModelType))
85      {
86        stateModelFactory = new OnlineOfflineStateModelFactory(delay);
87      } else if ("LeaderStandby".equalsIgnoreCase(stateModelType))
88      {
89        stateModelFactory = new LeaderStandbyStateModelFactory(delay);
90      }
91      // genericStateMachineHandler = new StateMachineEngine();
92      // genericStateMachineHandler.registerStateModelFactory(stateModelType,
93      // stateModelFactory);
94  
95      StateMachineEngine stateMach = manager.getStateMachineEngine();
96      stateMach.registerStateModelFactory(stateModelType, stateModelFactory);
97      manager.connect();
98      manager.getMessagingService().registerMessageHandlerFactory(
99          MessageType.STATE_TRANSITION.toString(), stateMach);
100   }
101 
102   public void stop()
103   {
104     manager.disconnect();
105   }
106 
107   @SuppressWarnings("static-access")
108   private static Options constructCommandLineOptions()
109   {
110     Option helpOption = OptionBuilder.withLongOpt(help)
111         .withDescription("Prints command-line options info").create();
112 
113     Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
114         .withDescription("Provide zookeeper address").create();
115     zkServerOption.setArgs(1);
116     zkServerOption.setRequired(true);
117     zkServerOption.setArgName("ZookeeperServerAddress(Required)");
118 
119     Option clusterOption = OptionBuilder.withLongOpt(cluster)
120         .withDescription("Provide cluster name").create();
121     clusterOption.setArgs(1);
122     clusterOption.setRequired(true);
123     clusterOption.setArgName("Cluster name (Required)");
124 
125     Option hostOption = OptionBuilder.withLongOpt(hostAddress)
126         .withDescription("Provide host name").create();
127     hostOption.setArgs(1);
128     hostOption.setRequired(true);
129     hostOption.setArgName("Host name (Required)");
130 
131     Option portOption = OptionBuilder.withLongOpt(hostPort)
132         .withDescription("Provide host port").create();
133     portOption.setArgs(1);
134     portOption.setRequired(true);
135     portOption.setArgName("Host port (Required)");
136 
137     Option stateModelOption = OptionBuilder.withLongOpt(stateModel)
138         .withDescription("StateModel Type").create();
139     stateModelOption.setArgs(1);
140     stateModelOption.setRequired(true);
141     stateModelOption.setArgName("StateModel Type (Required)");
142 
143     // add an option group including either --zkSvr or --configFile
144     Option fileOption = OptionBuilder.withLongOpt(configFile)
145         .withDescription("Provide file to read states/messages").create();
146     fileOption.setArgs(1);
147     fileOption.setRequired(true);
148     fileOption.setArgName("File to read states/messages (Optional)");
149 
150     Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
151         .withDescription("Provide state trans delay").create();
152     transDelayOption.setArgs(1);
153     transDelayOption.setRequired(false);
154     transDelayOption.setArgName("Delay time in state transition, in MS");
155 
156     OptionGroup optionGroup = new OptionGroup();
157     optionGroup.addOption(zkServerOption);
158     optionGroup.addOption(fileOption);
159 
160     Options options = new Options();
161     options.addOption(helpOption);
162     // options.addOption(zkServerOption);
163     options.addOption(clusterOption);
164     options.addOption(hostOption);
165     options.addOption(portOption);
166     options.addOption(stateModelOption);
167     options.addOption(transDelayOption);
168 
169     options.addOptionGroup(optionGroup);
170 
171     return options;
172   }
173 
174   public static void printUsage(Options cliOptions)
175   {
176     HelpFormatter helpFormatter = new HelpFormatter();
177     helpFormatter.printHelp("java " + ExampleProcess.class.getName(),
178         cliOptions);
179   }
180 
181   public static CommandLine processCommandLineArgs(String[] cliArgs)
182       throws Exception
183   {
184     CommandLineParser cliParser = new GnuParser();
185     Options cliOptions = constructCommandLineOptions();
186     try
187     {
188       return cliParser.parse(cliOptions, cliArgs);
189     } catch (ParseException pe)
190     {
191       System.err
192           .println("CommandLineClient: failed to parse command-line options: "
193               + pe.toString());
194       printUsage(cliOptions);
195       System.exit(1);
196     }
197     return null;
198   }
199 
200   public static void main(String[] args) throws Exception
201   {
202     String zkConnectString = "localhost:2181";
203     String clusterName = "storage-integration-cluster";
204     String instanceName = "localhost_8905";
205     String file = null;
206     String stateModelValue = "MasterSlave";
207     int delay = 0;
208     boolean skipZeroArgs = true;// false is for dev testing
209     if (!skipZeroArgs || args.length > 0)
210     {
211       CommandLine cmd = processCommandLineArgs(args);
212       zkConnectString = cmd.getOptionValue(zkServer);
213       clusterName = cmd.getOptionValue(cluster);
214 
215       String host = cmd.getOptionValue(hostAddress);
216       String portString = cmd.getOptionValue(hostPort);
217       int port = Integer.parseInt(portString);
218       instanceName = host + "_" + port;
219 
220       file = cmd.getOptionValue(configFile);
221       if (file != null)
222       {
223         File f = new File(file);
224         if (!f.exists())
225         {
226           System.err.println("static config file doesn't exist");
227           System.exit(1);
228         }
229       }
230 
231       stateModelValue = cmd.getOptionValue(stateModel);
232       if (cmd.hasOption(transDelay))
233       {
234         try
235         {
236           delay = Integer.parseInt(cmd.getOptionValue(transDelay));
237           if (delay < 0)
238           {
239             throw new Exception("delay must be positive");
240           }
241         } catch (Exception e)
242         {
243           e.printStackTrace();
244           delay = 0;
245         }
246       }
247     }
248     // Espresso_driver.py will consume this
249     System.out.println("Starting Process with ZK:" + zkConnectString);
250 
251     ExampleProcess process = new ExampleProcess(zkConnectString, clusterName,
252         instanceName, file, stateModelValue, delay);
253 
254     process.start();
255     Thread.currentThread().join();
256   }
257 }