1 package org.apache.helix.recipes.rabbitmq;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23
24 import org.apache.helix.HelixManager;
25 import org.apache.helix.HelixManagerFactory;
26 import org.apache.helix.InstanceType;
27 import org.apache.helix.manager.zk.ZKHelixAdmin;
28 import org.apache.helix.manager.zk.ZNRecordSerializer;
29 import org.apache.helix.manager.zk.ZkClient;
30 import org.apache.helix.model.InstanceConfig;
31 import org.apache.helix.participant.StateMachineEngine;
32 import org.apache.helix.participant.statemachine.StateModel;
33
34 public class Consumer
35 {
36 private final String _zkAddr;
37 private final String _clusterName;
38 private final String _consumerId;
39 private final String _mqServer;
40 private HelixManager _manager = null;
41
42 public Consumer(String zkAddr, String clusterName, String consumerId, String mqServer)
43 {
44 _zkAddr = zkAddr;
45 _clusterName = clusterName;
46 _consumerId = consumerId;
47 _mqServer = mqServer;
48 }
49
50 public void connect()
51 {
52 try
53 {
54 _manager =
55 HelixManagerFactory.getZKHelixManager(_clusterName,
56 _consumerId,
57 InstanceType.PARTICIPANT,
58 _zkAddr);
59
60 StateMachineEngine stateMach = _manager.getStateMachineEngine();
61 ConsumerStateModelFactory modelFactory =
62 new ConsumerStateModelFactory(_consumerId, _mqServer);
63 stateMach.registerStateModelFactory(SetupConsumerCluster.DEFAULT_STATE_MODEL, modelFactory);
64
65 _manager.connect();
66
67 Thread.currentThread().join();
68 }
69 catch (InterruptedException e)
70 {
71 System.err.println(" [-] " + _consumerId + " is interrupted ...");
72 }
73 catch (Exception e)
74 {
75
76 e.printStackTrace();
77 }
78 finally
79 {
80 disconnect();
81 }
82 }
83
84 public void disconnect()
85 {
86 if (_manager != null)
87 {
88 _manager.disconnect();
89 }
90 }
91
92 public static void main(String[] args) throws Exception
93 {
94 if (args.length < 3)
95 {
96 System.err.println("USAGE: java Consumer zookeeperAddress (e.g. localhost:2181) consumerId (0-2), rabbitmqServer (e.g. localhost)");
97 System.exit(1);
98 }
99
100 final String zkAddr = args[0];
101 final String clusterName = SetupConsumerCluster.DEFAULT_CLUSTER_NAME;
102 final String consumerId = args[1];
103 final String mqServer = args[2];
104
105 ZkClient zkclient = null;
106 try
107 {
108
109 zkclient =
110 new ZkClient(zkAddr,
111 ZkClient.DEFAULT_SESSION_TIMEOUT,
112 ZkClient.DEFAULT_CONNECTION_TIMEOUT,
113 new ZNRecordSerializer());
114 ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
115
116 List<String> nodes = admin.getInstancesInCluster(clusterName);
117 if (!nodes.contains("consumer_" + consumerId))
118 {
119 InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
120 config.setHostName("localhost");
121 config.setInstanceEnabled(true);
122 admin.addInstance(clusterName, config);
123 }
124
125
126 final Consumer consumer =
127 new Consumer(zkAddr, clusterName, "consumer_" + consumerId, mqServer);
128
129 Runtime.getRuntime().addShutdownHook(new Thread()
130 {
131 @Override
132 public void run()
133 {
134 System.out.println("Shutting down consumer_" + consumerId);
135 consumer.disconnect();
136 }
137 });
138
139 consumer.connect();
140 }
141 finally
142 {
143 if (zkclient != null)
144 {
145 zkclient.close();
146 }
147 }
148 }
149 }