View Javadoc

1   package org.apache.helix.mock.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  
23  import org.apache.commons.cli.CommandLine;
24  import org.apache.commons.cli.CommandLineParser;
25  import org.apache.commons.cli.GnuParser;
26  import org.apache.commons.cli.HelpFormatter;
27  import org.apache.commons.cli.Option;
28  import org.apache.commons.cli.OptionBuilder;
29  import org.apache.commons.cli.OptionGroup;
30  import org.apache.commons.cli.Options;
31  import org.apache.commons.cli.ParseException;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.HelixManagerFactory;
34  import org.apache.helix.InstanceType;
35  import org.apache.helix.NotificationContext;
36  import org.apache.helix.model.Message;
37  import org.apache.helix.participant.StateMachineEngine;
38  import org.apache.helix.participant.statemachine.StateModel;
39  import org.apache.helix.participant.statemachine.StateModelFactory;
40  import org.apache.log4j.Logger;
41  
42  
43  public class DummyProcess
44  {
45    private static final Logger logger = Logger.getLogger(DummyProcess.class);
46    public static final String zkServer = "zkSvr";
47    public static final String cluster = "cluster";
48    public static final String hostAddress = "host";
49    public static final String hostPort = "port";
50    public static final String relayCluster = "relayCluster";
51    public static final String help = "help";
52    public static final String transDelay = "transDelay";
53    public static final String helixManagerType = "helixManagerType";
54  //  public static final String rootNamespace = "rootNamespace";
55  
56    private final String _zkConnectString;
57    private final String _clusterName;
58    private final String _instanceName;
59    private DummyStateModelFactory stateModelFactory;
60  //  private StateMachineEngine genericStateMachineHandler;
61  
62  
63    private int _transDelayInMs = 0;
64    private final String _clusterMangerType;
65  
66    public DummyProcess(String zkConnectString,
67                        String clusterName,
68                        String instanceName,
69                        String clusterMangerType,
70                        int delay)
71    {
72      _zkConnectString = zkConnectString;
73      _clusterName = clusterName;
74      _instanceName = instanceName;
75      _clusterMangerType = clusterMangerType;
76      _transDelayInMs = delay > 0 ? delay : 0;
77    }
78  
79    static void sleep(long transDelay)
80    {
81      try
82      {
83        if (transDelay > 0)
84        {
85          Thread.sleep(transDelay);
86        }
87      }
88      catch (InterruptedException e)
89      {
90        // TODO Auto-generated catch block
91        e.printStackTrace();
92      }
93    }
94  
95    public HelixManager start() throws Exception
96    {
97      HelixManager manager = null;
98      // zk cluster manager
99      if (_clusterMangerType.equalsIgnoreCase("zk"))
100     {
101       manager = HelixManagerFactory.getZKHelixManager(_clusterName,
102                                                           _instanceName,
103                                                           InstanceType.PARTICIPANT,
104                                                           _zkConnectString);
105     }
106     else
107     {
108       throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType);
109     }
110 
111     stateModelFactory = new DummyStateModelFactory(_transDelayInMs);
112     DummyLeaderStandbyStateModelFactory stateModelFactory1 = new DummyLeaderStandbyStateModelFactory(_transDelayInMs);
113     DummyOnlineOfflineStateModelFactory stateModelFactory2 = new DummyOnlineOfflineStateModelFactory(_transDelayInMs);
114 //    genericStateMachineHandler = new StateMachineEngine();
115     StateMachineEngine stateMach = manager.getStateMachineEngine();
116     stateMach.registerStateModelFactory("MasterSlave", stateModelFactory);
117     stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory1);
118     stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2);
119 
120     manager.connect();
121 //    manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler);
122     return manager;
123   }
124 
125   public static class DummyStateModelFactory extends StateModelFactory<DummyStateModel>
126   {
127     int _delay;
128 
129     public DummyStateModelFactory(int delay)
130     {
131       _delay = delay;
132     }
133 
134     @Override
135     public DummyStateModel createNewStateModel(String stateUnitKey)
136     {
137       DummyStateModel model = new DummyStateModel();
138       model.setDelay(_delay);
139       return model;
140     }
141   }
142 
143   public static class DummyLeaderStandbyStateModelFactory extends StateModelFactory<DummyLeaderStandbyStateModel>
144   {
145     int _delay;
146 
147     public DummyLeaderStandbyStateModelFactory(int delay)
148     {
149       _delay = delay;
150     }
151 
152     @Override
153     public DummyLeaderStandbyStateModel createNewStateModel(String stateUnitKey)
154     {
155       DummyLeaderStandbyStateModel model = new DummyLeaderStandbyStateModel();
156       model.setDelay(_delay);
157       return model;
158     }
159   }
160 
161   public static class DummyOnlineOfflineStateModelFactory extends StateModelFactory<DummyOnlineOfflineStateModel>
162   {
163     int _delay;
164 
165     public DummyOnlineOfflineStateModelFactory(int delay)
166     {
167       _delay = delay;
168     }
169 
170     @Override
171     public DummyOnlineOfflineStateModel createNewStateModel(String stateUnitKey)
172     {
173       DummyOnlineOfflineStateModel model = new DummyOnlineOfflineStateModel();
174       model.setDelay(_delay);
175       return model;
176     }
177   }
178   public static class DummyStateModel extends StateModel
179   {
180     int _transDelay = 0;
181 
182     public void setDelay(int delay)
183     {
184       _transDelay = delay > 0 ? delay : 0;
185     }
186 
187     public void onBecomeSlaveFromOffline(Message message,
188         NotificationContext context)
189     {
190       String db = message.getPartitionName();
191       String instanceName = context.getManager().getInstanceName();
192       DummyProcess.sleep(_transDelay);
193 
194       logger.info("DummyStateModel.onBecomeSlaveFromOffline(), instance:" + instanceName
195                          + ", db:" + db);
196     }
197 
198     public void onBecomeSlaveFromMaster(Message message,
199         NotificationContext context)
200     {
201       DummyProcess.sleep(_transDelay);
202 
203       logger.info("DummyStateModel.onBecomeSlaveFromMaster()");
204 
205     }
206 
207     public void onBecomeMasterFromSlave(Message message,
208         NotificationContext context)
209     {
210       DummyProcess.sleep(_transDelay);
211 
212       logger.info("DummyStateModel.onBecomeMasterFromSlave()");
213 
214     }
215 
216     public void onBecomeOfflineFromSlave(Message message,
217         NotificationContext context)
218     {
219       DummyProcess.sleep(_transDelay);
220 
221       logger.info("DummyStateModel.onBecomeOfflineFromSlave()");
222 
223     }
224 
225     public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
226     {
227       DummyProcess.sleep(_transDelay);
228 
229       logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
230 
231     }
232   }
233 
234 
235   public static class DummyOnlineOfflineStateModel extends StateModel
236   {
237     int _transDelay = 0;
238 
239     public void setDelay(int delay)
240     {
241       _transDelay = delay > 0 ? delay : 0;
242     }
243 
244     public void onBecomeOnlineFromOffline(Message message,
245         NotificationContext context)
246     {
247       String db = message.getPartitionName();
248       String instanceName = context.getManager().getInstanceName();
249       DummyProcess.sleep(_transDelay);
250 
251       logger.info("DummyStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName
252                          + ", db:" + db);
253     }
254 
255     public void onBecomeOfflineFromOnline(Message message,
256         NotificationContext context)
257     {
258       DummyProcess.sleep(_transDelay);
259 
260       logger.info("DummyStateModel.onBecomeOfflineFromOnline()");
261 
262     }
263     public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
264     {
265       DummyProcess.sleep(_transDelay);
266 
267       logger.info("DummyStateModel.onBecomeDroppedFromOffline()");
268 
269     }
270   }
271 
272   public static class DummyLeaderStandbyStateModel extends StateModel
273   {
274     int _transDelay = 0;
275 
276     public void setDelay(int delay)
277     {
278       _transDelay = delay > 0 ? delay : 0;
279     }
280 
281     public void onBecomeLeaderFromStandby(Message message,
282         NotificationContext context)
283     {
284       String db = message.getPartitionName();
285       String instanceName = context.getManager().getInstanceName();
286       DummyProcess.sleep(_transDelay);
287       logger.info("DummyLeaderStandbyStateModel.onBecomeLeaderFromStandby(), instance:" + instanceName
288                          + ", db:" + db);
289     }
290 
291     public void onBecomeStandbyFromLeader(Message message,
292         NotificationContext context)
293     {
294       DummyProcess.sleep(_transDelay);
295 
296       logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromLeader()");
297 
298     }
299     public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
300     {
301       DummyProcess.sleep(_transDelay);
302 
303       logger.info("DummyLeaderStandbyStateModel.onBecomeDroppedFromOffline()");
304 
305     }
306 
307     public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
308     {
309       DummyProcess.sleep(_transDelay);
310 
311       logger.info("DummyLeaderStandbyStateModel.onBecomeStandbyFromOffline()");
312 
313     }
314 
315     public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
316     {
317       DummyProcess.sleep(_transDelay);
318 
319       logger.info("DummyLeaderStandbyStateModel.onBecomeOfflineFromStandby()");
320 
321     }
322   }
323 
324   // TODO hack OptionBuilder is not thread safe
325   @SuppressWarnings("static-access")
326   synchronized private static Options constructCommandLineOptions()
327   {
328     Option helpOption = OptionBuilder.withLongOpt(help)
329         .withDescription("Prints command-line options info").create();
330 
331     Option clusterOption = OptionBuilder.withLongOpt(cluster)
332         .withDescription("Provide cluster name").create();
333     clusterOption.setArgs(1);
334     clusterOption.setRequired(true);
335     clusterOption.setArgName("Cluster name (Required)");
336 
337     Option hostOption = OptionBuilder.withLongOpt(hostAddress)
338         .withDescription("Provide host name").create();
339     hostOption.setArgs(1);
340     hostOption.setRequired(true);
341     hostOption.setArgName("Host name (Required)");
342 
343     Option portOption = OptionBuilder.withLongOpt(hostPort)
344         .withDescription("Provide host port").create();
345     portOption.setArgs(1);
346     portOption.setRequired(true);
347     portOption.setArgName("Host port (Required)");
348 
349     Option cmTypeOption = OptionBuilder.withLongOpt(helixManagerType)
350         .withDescription("Provide cluster manager type (e.g. 'zk', 'static-file', or 'dynamic-file'").create();
351     cmTypeOption.setArgs(1);
352     cmTypeOption.setRequired(true);
353     cmTypeOption.setArgName("Clsuter manager type (e.g. 'zk', 'static-file', or 'dynamic-file') (Required)");
354 
355     Option zkServerOption = OptionBuilder.withLongOpt(zkServer)
356       .withDescription("Provide zookeeper address").create();
357     zkServerOption.setArgs(1);
358     zkServerOption.setRequired(true);
359     zkServerOption.setArgName("ZookeeperServerAddress(Required for zk-based cluster manager)");
360 
361 //    Option rootNsOption = OptionBuilder.withLongOpt(rootNamespace)
362 //        .withDescription("Provide root namespace for dynamic-file based cluster manager").create();
363 //    rootNsOption.setArgs(1);
364 //    rootNsOption.setRequired(true);
365 //    rootNsOption.setArgName("Root namespace (Required for dynamic-file based cluster manager)");
366 
367 
368     Option transDelayOption = OptionBuilder.withLongOpt(transDelay)
369         .withDescription("Provide state trans delay").create();
370     transDelayOption.setArgs(1);
371     transDelayOption.setRequired(false);
372     transDelayOption.setArgName("Delay time in state transition, in MS");
373 
374     OptionGroup optionGroup = new OptionGroup();
375     optionGroup.addOption(zkServerOption);
376 
377     Options options = new Options();
378     options.addOption(helpOption);
379     options.addOption(clusterOption);
380     options.addOption(hostOption);
381     options.addOption(portOption);
382     options.addOption(transDelayOption);
383     options.addOption(cmTypeOption);
384 
385     options.addOptionGroup(optionGroup);
386 
387     return options;
388   }
389 
390   public static void printUsage(Options cliOptions)
391   {
392     HelpFormatter helpFormatter = new HelpFormatter();
393     helpFormatter.printHelp("java " + DummyProcess.class.getName(), cliOptions);
394   }
395 
396   public static CommandLine processCommandLineArgs(String[] cliArgs)
397       throws Exception
398   {
399     CommandLineParser cliParser = new GnuParser();
400     Options cliOptions = constructCommandLineOptions();
401     // CommandLine cmd = null;
402 
403     try
404     {
405       return cliParser.parse(cliOptions, cliArgs);
406     } catch (ParseException pe)
407     {
408       System.err
409           .println("CommandLineClient: failed to parse command-line options: "
410               + pe.toString());
411       printUsage(cliOptions);
412       System.exit(1);
413     }
414     return null;
415   }
416 
417   public static void main(String[] args) throws Exception
418   {
419     String cmType = "zk";
420     String zkConnectString = "localhost:2181";
421     String clusterName = "testCluster";
422     String instanceName = "localhost_8900";
423     String cvFileStr = null;
424 //    String rootNs = null;
425     int delay = 0;
426 
427     if (args.length > 0)
428     {
429       CommandLine cmd = processCommandLineArgs(args);
430       zkConnectString = cmd.getOptionValue(zkServer);
431       clusterName = cmd.getOptionValue(cluster);
432 
433       String host = cmd.getOptionValue(hostAddress);
434       String portString = cmd.getOptionValue(hostPort);
435       int port = Integer.parseInt(portString);
436       instanceName = host + "_" + port;
437       cmType = cmd.getOptionValue(helixManagerType);
438       if (cmd.hasOption(transDelay))
439       {
440         try
441         {
442           delay = Integer.parseInt(cmd.getOptionValue(transDelay));
443           if (delay < 0)
444           {
445             throw new Exception("delay must be positive");
446           }
447         } catch (Exception e)
448         {
449           e.printStackTrace();
450           delay = 0;
451         }
452       }
453     }
454     // Espresso_driver.py will consume this
455     logger.info("Dummy process started, instanceName:" + instanceName);
456 
457     DummyProcess process = new DummyProcess(zkConnectString,
458                                             clusterName,
459                                             instanceName,
460                                             cmType,
461                                             delay);
462     HelixManager manager = process.start();
463 
464     try
465     {
466       Thread.currentThread().join();
467     }
468     catch (InterruptedException e)
469     {
470       // ClusterManagerFactory.disconnectManagers(instanceName);
471       logger.info("participant:" + instanceName + ", " +
472                    Thread.currentThread().getName() + " interrupted");
473 //      if (manager != null)
474 //      {
475 //        manager.disconnect();
476 //      }
477     }
478     finally
479     {
480       if (manager != null)
481       {
482         manager.disconnect();
483       }
484     }
485 
486   }
487 }