1 package org.apache.helix.recipes.rabbitmq;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import com.rabbitmq.client.Channel;
23 import com.rabbitmq.client.Connection;
24 import com.rabbitmq.client.ConnectionFactory;
25
26 public class Emitter
27 {
28
29 private static final String EXCHANGE_NAME = "topic_logs";
30
31 public static void main(String[] args) throws Exception
32 {
33 if (args.length < 1)
34 {
35 System.err.println("USAGE: java Emitter rabbitmqServer (e.g. localhost) numberOfMessage (optional)");
36 System.exit(1);
37 }
38
39 final String mqServer = args[0];
40 int count = Integer.MAX_VALUE;
41 if (args.length > 1)
42 {
43 try
44 {
45 count = Integer.parseInt(args[1]);
46 } catch (Exception e) {
47
48 }
49 }
50 System.out.println("Sending " + count + " messages with random topic id");
51
52
53 ConnectionFactory factory = new ConnectionFactory();
54 factory.setHost(mqServer);
55 Connection connection = factory.newConnection();
56 Channel channel = connection.createChannel();
57
58 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
59
60 for (int i = 0; i < count; i++)
61 {
62 int rand = ((int) (Math.random() * 10000) % SetupConsumerCluster.DEFAULT_PARTITION_NUMBER);
63 String routingKey = "topic_" + rand;
64 String message = "message_" + rand;
65
66 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
67 System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
68
69 Thread.sleep(1000);
70 }
71
72 connection.close();
73 }
74
75 }