View Javadoc

1   package org.apache.helix.recipes.rabbitmq;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  
24  import com.rabbitmq.client.Channel;
25  import com.rabbitmq.client.Connection;
26  import com.rabbitmq.client.ConnectionFactory;
27  import com.rabbitmq.client.QueueingConsumer;
28  
29  public class ConsumerThread extends Thread
30  {
31    private static final String EXCHANGE_NAME = "topic_logs";
32    private final String _partition;
33    private final String _mqServer;
34    private final String _consumerId;
35    
36    public ConsumerThread(String partition, String mqServer, String consumerId)
37    {
38      _partition = partition;
39      _mqServer = mqServer;
40      _consumerId = consumerId;
41    }
42  
43    @Override
44    public void run()
45    {
46      Connection connection = null;
47      try
48      {
49        ConnectionFactory factory = new ConnectionFactory();
50        factory.setHost(_mqServer);
51        connection = factory.newConnection();
52        Channel channel = connection.createChannel();
53  
54        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
55        String queueName = channel.queueDeclare().getQueue();
56  
57        String bindingKey = _partition;
58        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
59  
60        System.out.println(" [*] " + _consumerId + " Waiting for messages on " + bindingKey + ". To exit press CTRL+C");
61  
62        QueueingConsumer consumer = new QueueingConsumer(channel);
63        channel.basicConsume(queueName, true, consumer);
64  
65        while (true)
66        {
67          QueueingConsumer.Delivery delivery = consumer.nextDelivery();
68          String message = new String(delivery.getBody());
69          String routingKey = delivery.getEnvelope().getRoutingKey();
70  
71          System.out.println(" [x] " + _consumerId + " Received '" + routingKey + "':'" + message + "'");
72        }
73      } catch (InterruptedException e)
74      {
75        System.err.println(" [-] " + _consumerId + " on " + _partition + " is interrupted ...");
76      }
77      catch (Exception e)
78      {
79        e.printStackTrace();
80      } finally
81      {
82        if (connection != null)
83        {
84          try
85          {
86            connection.close();
87          } catch (IOException e)
88          {
89            // TODO Auto-generated catch block
90            e.printStackTrace();
91          }
92        }
93      }
94    }
95  }