1 package org.apache.helix.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Map;
23 import java.util.UUID;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import org.apache.helix.HelixConstants;
27 import org.apache.helix.HelixDataAccessor;
28 import org.apache.helix.HelixException;
29 import org.apache.helix.HelixManager;
30 import org.apache.helix.InstanceType;
31 import org.apache.helix.NotificationContext;
32 import org.apache.helix.NotificationContext.MapKey;
33 import org.apache.helix.PropertyKey.Builder;
34 import org.apache.helix.messaging.handling.BatchMessageHandler;
35 import org.apache.helix.messaging.handling.BatchMessageWrapper;
36 import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
37 import org.apache.helix.messaging.handling.HelixTaskExecutor;
38 import org.apache.helix.messaging.handling.MessageHandler;
39 import org.apache.helix.messaging.handling.TaskExecutor;
40 import org.apache.helix.model.CurrentState;
41 import org.apache.helix.model.Message;
42 import org.apache.helix.model.StateModelDefinition;
43 import org.apache.helix.model.Message.MessageType;
44 import org.apache.helix.participant.statemachine.StateModel;
45 import org.apache.helix.participant.statemachine.StateModelFactory;
46 import org.apache.helix.participant.statemachine.StateModelParser;
47 import org.apache.log4j.Logger;
48
49
50 public class HelixStateMachineEngine implements StateMachineEngine
51 {
52 private static Logger logger = Logger.getLogger(HelixStateMachineEngine.class);
53
54
55 private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
56 private final StateModelParser _stateModelParser;
57 private final HelixManager _manager;
58 private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
59
60 public HelixStateMachineEngine(HelixManager manager)
61 {
62 _stateModelParser = new StateModelParser();
63 _manager = manager;
64
65 _stateModelFactoryMap =
66 new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
67 _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
68 }
69
70 public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName)
71 {
72 return getStateModelFactory(stateModelName,
73 HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
74 }
75
76 public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
77 String factoryName)
78 {
79 if (!_stateModelFactoryMap.containsKey(stateModelName))
80 {
81 return null;
82 }
83 return _stateModelFactoryMap.get(stateModelName).get(factoryName);
84 }
85
86 @Override
87 public boolean registerStateModelFactory(String stateModelDef,
88 StateModelFactory<? extends StateModel> factory)
89 {
90 return registerStateModelFactory(stateModelDef,
91 factory,
92 HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
93 }
94
95 @Override
96 public boolean registerStateModelFactory(String stateModelName,
97 StateModelFactory<? extends StateModel> factory,
98 String factoryName)
99 {
100 if (stateModelName == null || factory == null || factoryName == null)
101 {
102 throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
103 }
104
105 logger.info("Register state model factory for state model " + stateModelName
106 + " using factory name " + factoryName + " with " + factory);
107
108 if (!_stateModelFactoryMap.containsKey(stateModelName))
109 {
110 _stateModelFactoryMap.put(stateModelName,
111 new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
112 }
113
114 if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName))
115 {
116 logger.warn("stateModelFactory for " + stateModelName + " using factoryName "
117 + factoryName + " has already been registered.");
118 return false;
119 }
120
121 _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
122 sendNopMessage();
123 return true;
124 }
125
126
127 private void sendNopMessage()
128 {
129 if (_manager.isConnected())
130 {
131 try
132 {
133 Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
134 nopMsg.setSrcName(_manager.getInstanceName());
135
136 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
137 Builder keyBuilder = accessor.keyBuilder();
138
139 if (_manager.getInstanceType() == InstanceType.CONTROLLER
140 || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
141 {
142 nopMsg.setTgtName("Controller");
143 accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
144 }
145
146 if (_manager.getInstanceType() == InstanceType.PARTICIPANT
147 || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
148 {
149 nopMsg.setTgtName(_manager.getInstanceName());
150 accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
151 nopMsg);
152 }
153 logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: "
154 + nopMsg.getId());
155 }
156 catch (Exception e)
157 {
158 logger.error(e);
159 }
160 }
161 }
162
163 @Override
164 public void reset()
165 {
166 for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap.values())
167 {
168 for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values())
169 {
170 Map<String, ? extends StateModel> modelMap = stateModelFactory.getStateModelMap();
171 if (modelMap == null || modelMap.isEmpty())
172 {
173 continue;
174 }
175
176 for (String resourceKey : modelMap.keySet())
177 {
178 StateModel stateModel = modelMap.get(resourceKey);
179 stateModel.reset();
180 String initialState = _stateModelParser.getInitialState(stateModel.getClass());
181 stateModel.updateState(initialState);
182
183
184 }
185 }
186 }
187 }
188
189 @Override
190 public MessageHandler createHandler(Message message, NotificationContext context)
191 {
192 String type = message.getMsgType();
193
194 if (!type.equals(MessageType.STATE_TRANSITION.toString()))
195 {
196 throw new HelixException("Expect state-transition message type, but was "
197 + message.getMsgType() + ", msgId: " + message.getMsgId());
198 }
199
200 String partitionKey = message.getPartitionName();
201 String stateModelName = message.getStateModelDef();
202 String resourceName = message.getResourceName();
203 String sessionId = message.getTgtSessionId();
204 int bucketSize = message.getBucketSize();
205
206 if (stateModelName == null)
207 {
208 logger.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: " + message.getId());
209 return null;
210 }
211
212 String factoryName = message.getStateModelFactoryName();
213 if (factoryName == null)
214 {
215 factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
216 }
217
218 StateModelFactory<? extends StateModel> stateModelFactory =
219 getStateModelFactory(stateModelName, factoryName);
220 if (stateModelFactory == null)
221 {
222 logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: " + stateModelName
223 + " using factoryName: " + factoryName + " for resource: " + resourceName);
224 return null;
225 }
226
227
228 if (!_stateModelDefs.containsKey(stateModelName))
229 {
230 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
231 Builder keyBuilder = accessor.keyBuilder();
232 StateModelDefinition stateModelDef =
233 accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
234 if (stateModelDef == null)
235 {
236 throw new HelixException("fail to create msg-handler because stateModelDef for " + stateModelName
237 + " does NOT exist");
238 }
239 _stateModelDefs.put(stateModelName, stateModelDef);
240 }
241
242 if (message.getBatchMessageMode() == false) {
243
244 String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
245 StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
246 if (stateModel == null)
247 {
248 stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
249 stateModel.updateState(initState);
250 }
251
252
253 CurrentState currentStateDelta = new CurrentState(resourceName);
254 currentStateDelta.setSessionId(sessionId);
255 currentStateDelta.setStateModelDefRef(stateModelName);
256 currentStateDelta.setStateModelFactoryName(factoryName);
257 currentStateDelta.setBucketSize(bucketSize);
258
259 currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
260 ? initState : stateModel.getCurrentState());
261
262 return new HelixStateTransitionHandler(stateModel,
263 message,
264 context,
265 currentStateDelta);
266 } else
267 {
268 BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
269 if (wrapper == null)
270 {
271 wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
272 }
273
274
275 TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
276 if (executor == null)
277 {
278 logger.error("fail to get executor-service for batch message: " + message.getId()
279 + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName());
280 return null;
281 }
282 return new BatchMessageHandler(message, context, this, wrapper, executor);
283 }
284 }
285
286 @Override
287 public String getMessageType()
288 {
289 return MessageType.STATE_TRANSITION.toString();
290 }
291
292 @Override
293 public boolean removeStateModelFactory(String stateModelDef,
294 StateModelFactory<? extends StateModel> factory)
295 {
296 throw new UnsupportedOperationException("Remove not yet supported");
297 }
298
299 @Override
300 public boolean removeStateModelFactory(String stateModelDef,
301 StateModelFactory<? extends StateModel> factory,
302 String factoryName)
303 {
304 throw new UnsupportedOperationException("Remove not yet supported");
305 }
306 }