View Javadoc

1   package org.apache.helix.recipes.rabbitmq;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  
24  import org.apache.helix.HelixManager;
25  import org.apache.helix.HelixManagerFactory;
26  import org.apache.helix.InstanceType;
27  import org.apache.helix.manager.zk.ZKHelixAdmin;
28  import org.apache.helix.manager.zk.ZNRecordSerializer;
29  import org.apache.helix.manager.zk.ZkClient;
30  import org.apache.helix.model.InstanceConfig;
31  import org.apache.helix.participant.StateMachineEngine;
32  import org.apache.helix.participant.statemachine.StateModel;
33  
34  public class Consumer
35  {
36    private final String _zkAddr;
37    private final String _clusterName;
38    private final String _consumerId;
39    private final String _mqServer;
40    private HelixManager _manager = null;
41  
42    public Consumer(String zkAddr, String clusterName, String consumerId, String mqServer)
43    {
44      _zkAddr = zkAddr;
45      _clusterName = clusterName;
46      _consumerId = consumerId;
47      _mqServer = mqServer;
48    }
49  
50    public void connect()
51    {
52      try
53      {
54        _manager =
55            HelixManagerFactory.getZKHelixManager(_clusterName,
56                                                  _consumerId,
57                                                  InstanceType.PARTICIPANT,
58                                                  _zkAddr);
59  
60        StateMachineEngine stateMach = _manager.getStateMachineEngine();
61        ConsumerStateModelFactory modelFactory =
62            new ConsumerStateModelFactory(_consumerId, _mqServer);
63        stateMach.registerStateModelFactory(SetupConsumerCluster.DEFAULT_STATE_MODEL, modelFactory);
64  
65        _manager.connect();
66  
67        Thread.currentThread().join();
68      }
69      catch (InterruptedException e)
70      {
71        System.err.println(" [-] " + _consumerId + " is interrupted ...");
72      }
73      catch (Exception e)
74      {
75        // TODO Auto-generated catch block
76        e.printStackTrace();
77      }
78      finally
79      {
80        disconnect();
81      }
82    }
83  
84    public void disconnect()
85    {
86      if (_manager != null)
87      {
88        _manager.disconnect();
89      }
90    }
91  
92    public static void main(String[] args) throws Exception
93    {
94      if (args.length < 3)
95      {
96        System.err.println("USAGE: java Consumer zookeeperAddress (e.g. localhost:2181) consumerId (0-2), rabbitmqServer (e.g. localhost)");
97        System.exit(1);
98      }
99  
100     final String zkAddr = args[0];
101     final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
102     final String consumerId = args[1];
103     final String mqServer = args[2];
104 
105     ZkClient zkclient = null;
106     try
107     {
108       // add node to cluster if not already added
109       zkclient =
110           new ZkClient(zkAddr,
111                        ZkClient.DEFAULT_SESSION_TIMEOUT,
112                        ZkClient.DEFAULT_CONNECTION_TIMEOUT,
113                        new ZNRecordSerializer());
114       ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
115 
116       List<String> nodes = admin.getInstancesInCluster(clusterName);
117       if (!nodes.contains("consumer_" + consumerId))
118       {
119         InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
120         config.setHostName("localhost");
121         config.setInstanceEnabled(true);
122         admin.addInstance(clusterName, config);
123       }
124 
125       // start consumer
126       final Consumer consumer =
127           new Consumer(zkAddr, clusterName, "consumer_" + consumerId, mqServer);
128 
129       Runtime.getRuntime().addShutdownHook(new Thread()
130       {
131         @Override
132         public void run()
133         {
134           System.out.println("Shutting down consumer_" + consumerId);
135           consumer.disconnect();
136         }
137       });
138 
139       consumer.connect();
140     }
141     finally
142     {
143       if (zkclient != null)
144       {
145         zkclient.close();
146       }
147     }
148   }
149 }