1 package org.apache.helix.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.helix.HelixManager;
23 import org.apache.helix.HelixManagerFactory;
24 import org.apache.helix.InstanceType;
25 import org.apache.helix.NotificationContext;
26 import org.apache.helix.model.Message;
27 import org.apache.helix.participant.statemachine.StateModel;
28 import org.apache.helix.participant.statemachine.StateModelInfo;
29 import org.apache.helix.participant.statemachine.StateModelParser;
30 import org.apache.helix.participant.statemachine.StateTransitionError;
31 import org.apache.helix.participant.statemachine.Transition;
32 import org.apache.log4j.Logger;
33
34
35
36 @StateModelInfo(initialState = "OFFLINE", states = { "LEADER", "STANDBY" })
37 public class DistClusterControllerStateModel extends StateModel
38 {
39 private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
40 private HelixManager _controller = null;
41 private final String _zkAddr;
42
43 public DistClusterControllerStateModel(String zkAddr)
44 {
45 StateModelParser parser = new StateModelParser();
46 _currentState = parser.getInitialState(DistClusterControllerStateModel.class);
47 _zkAddr = zkAddr;
48 }
49
50 @Transition(to="STANDBY",from="OFFLINE")
51 public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
52 {
53 logger.info("Becoming standby from offline");
54 }
55
56 @Transition(to="LEADER",from="STANDBY")
57 public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
58 throws Exception
59 {
60 String clusterName = message.getPartitionName();
61 String controllerName = message.getTgtName();
62
63 logger.info(controllerName + " becomes leader from standby for " + clusterName);
64
65
66 if (_controller == null)
67 {
68 _controller = HelixManagerFactory
69 .getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
70 _controller.connect();
71 _controller.startTimerTasks();
72 }
73 else
74 {
75 logger.error("controller already exists:" + _controller.getInstanceName()
76 + " for " + clusterName);
77 }
78
79 }
80
81 @Transition(to="STANDBY",from="LEADER")
82 public void onBecomeStandbyFromLeader(Message message, NotificationContext context)
83 {
84 String clusterName = message.getPartitionName();
85 String controllerName = message.getTgtName();
86
87 logger.info(controllerName + " becoming standby from leader for " + clusterName);
88
89 if (_controller != null)
90 {
91 _controller.disconnect();
92 _controller = null;
93 }
94 else
95 {
96 logger.error("No controller exists for " + clusterName);
97 }
98 }
99
100 @Transition(to="OFFLINE",from="STANDBY")
101 public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
102 {
103 String clusterName = message.getPartitionName();
104 String controllerName = message.getTgtName();
105
106 logger.info(controllerName + " becoming offline from standby for cluster:" + clusterName);
107
108 }
109
110 @Transition(to="DROPPED",from="OFFLINE")
111 public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
112 {
113 logger.info("Becoming dropped from offline");
114 }
115
116 @Transition(to="OFFLINE",from="DROPPED")
117 public void onBecomeOfflineFromDropped(Message message, NotificationContext context)
118 {
119 logger.info("Becoming offline from dropped");
120 }
121
122
123 @Override
124 public void rollbackOnError(Message message, NotificationContext context,
125 StateTransitionError error)
126 {
127 String clusterName = message.getPartitionName();
128 String controllerName = message.getTgtName();
129
130 logger.error(controllerName + " rollbacks on error for " + clusterName);
131
132 if (_controller != null)
133 {
134 _controller.disconnect();
135 _controller = null;
136 }
137
138 }
139
140 @Override
141 public void reset()
142 {
143 if (_controller != null)
144 {
145
146
147
148 _controller.disconnect();
149 _controller = null;
150 }
151
152 }
153 }