1 package org.apache.helix.recipes.rabbitmq;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.log4j.Logger;
23
24 import org.apache.helix.NotificationContext;
25 import org.apache.helix.model.Message;
26 import org.apache.helix.participant.statemachine.StateModel;
27 import org.apache.helix.participant.statemachine.StateModelInfo;
28 import org.apache.helix.participant.statemachine.Transition;
29
30 @StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "ERROR" })
31 public class ConsumerStateModel extends StateModel
32 {
33 private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
34
35 private final String _consumerId;
36 private final String _partition;
37
38 private final String _mqServer;
39 private ConsumerThread _thread = null;
40
41 public ConsumerStateModel(String consumerId, String partition, String mqServer)
42 {
43 _partition = partition;
44 _consumerId = consumerId;
45 _mqServer = mqServer;
46 }
47
48 @Transition(to = "ONLINE", from = "OFFLINE")
49 public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
50 {
51 LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
52
53 if (_thread == null)
54 {
55 LOG.debug("Starting ConsumerThread for " + _partition + "...");
56 _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
57 _thread.start();
58 LOG.debug("Starting ConsumerThread for " + _partition + " done");
59
60 }
61 }
62
63 @Transition(to = "OFFLINE", from = "ONLINE")
64 public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
65 throws InterruptedException
66 {
67 LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
68
69 if (_thread != null)
70 {
71 LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
72
73 _thread.interrupt();
74 _thread.join(2000);
75 _thread = null;
76 LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
77
78 }
79 }
80
81 @Transition(to = "DROPPED", from = "OFFLINE")
82 public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
83 {
84 LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
85 }
86
87 @Transition(to = "OFFLINE", from = "ERROR")
88 public void onBecomeOfflineFromError(Message message, NotificationContext context)
89 {
90 LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
91 }
92
93 @Override
94 public void reset()
95 {
96 LOG.warn("Default reset() invoked");
97
98 if (_thread != null)
99 {
100 LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
101
102 _thread.interrupt();
103 try
104 {
105 _thread.join(2000);
106 } catch (InterruptedException e)
107 {
108
109 e.printStackTrace();
110 }
111 _thread = null;
112 LOG.debug("Stopping " + _consumerId + " for " + _partition + " done");
113
114 }
115 }
116 }