1 package org.apache.helix.recipes.rabbitmq;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23
24 import com.rabbitmq.client.Channel;
25 import com.rabbitmq.client.Connection;
26 import com.rabbitmq.client.ConnectionFactory;
27 import com.rabbitmq.client.QueueingConsumer;
28
29 public class ConsumerThread extends Thread
30 {
31 private static final String EXCHANGE_NAME = "topic_logs";
32 private final String _partition;
33 private final String _mqServer;
34 private final String _consumerId;
35
36 public ConsumerThread(String partition, String mqServer, String consumerId)
37 {
38 _partition = partition;
39 _mqServer = mqServer;
40 _consumerId = consumerId;
41 }
42
43 @Override
44 public void run()
45 {
46 Connection connection = null;
47 try
48 {
49 ConnectionFactory factory = new ConnectionFactory();
50 factory.setHost(_mqServer);
51 connection = factory.newConnection();
52 Channel channel = connection.createChannel();
53
54 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
55 String queueName = channel.queueDeclare().getQueue();
56
57 String bindingKey = _partition;
58 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
59
60 System.out.println(" [*] " + _consumerId + " Waiting for messages on " + bindingKey + ". To exit press CTRL+C");
61
62 QueueingConsumer consumer = new QueueingConsumer(channel);
63 channel.basicConsume(queueName, true, consumer);
64
65 while (true)
66 {
67 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
68 String message = new String(delivery.getBody());
69 String routingKey = delivery.getEnvelope().getRoutingKey();
70
71 System.out.println(" [x] " + _consumerId + " Received '" + routingKey + "':'" + message + "'");
72 }
73 } catch (InterruptedException e)
74 {
75 System.err.println(" [-] " + _consumerId + " on " + _partition + " is interrupted ...");
76 }
77 catch (Exception e)
78 {
79 e.printStackTrace();
80 } finally
81 {
82 if (connection != null)
83 {
84 try
85 {
86 connection.close();
87 } catch (IOException e)
88 {
89
90 e.printStackTrace();
91 }
92 }
93 }
94 }
95 }