View Javadoc

1   package org.apache.helix.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Map;
23  import java.util.UUID;
24  import java.util.concurrent.ConcurrentHashMap;
25  
26  import org.apache.helix.HelixConstants;
27  import org.apache.helix.HelixDataAccessor;
28  import org.apache.helix.HelixException;
29  import org.apache.helix.HelixManager;
30  import org.apache.helix.InstanceType;
31  import org.apache.helix.NotificationContext;
32  import org.apache.helix.NotificationContext.MapKey;
33  import org.apache.helix.PropertyKey.Builder;
34  import org.apache.helix.messaging.handling.BatchMessageHandler;
35  import org.apache.helix.messaging.handling.BatchMessageWrapper;
36  import org.apache.helix.messaging.handling.HelixStateTransitionHandler;
37  import org.apache.helix.messaging.handling.HelixTaskExecutor;
38  import org.apache.helix.messaging.handling.MessageHandler;
39  import org.apache.helix.messaging.handling.TaskExecutor;
40  import org.apache.helix.model.CurrentState;
41  import org.apache.helix.model.Message;
42  import org.apache.helix.model.StateModelDefinition;
43  import org.apache.helix.model.Message.MessageType;
44  import org.apache.helix.participant.statemachine.StateModel;
45  import org.apache.helix.participant.statemachine.StateModelFactory;
46  import org.apache.helix.participant.statemachine.StateModelParser;
47  import org.apache.log4j.Logger;
48  
49  
50  public class HelixStateMachineEngine implements StateMachineEngine
51  {
52    private static Logger logger = Logger.getLogger(HelixStateMachineEngine.class);
53  
54    // StateModelName->FactoryName->StateModelFactory
55    private final Map<String, Map<String, StateModelFactory<? extends StateModel>>> _stateModelFactoryMap;
56    private final StateModelParser _stateModelParser;
57    private final HelixManager _manager;
58    private final ConcurrentHashMap<String, StateModelDefinition> _stateModelDefs;
59  
60    public HelixStateMachineEngine(HelixManager manager)
61    {
62      _stateModelParser = new StateModelParser();
63      _manager = manager;
64  
65      _stateModelFactoryMap =
66          new ConcurrentHashMap<String, Map<String, StateModelFactory<? extends StateModel>>>();
67      _stateModelDefs = new ConcurrentHashMap<String, StateModelDefinition>();
68    }
69  
70    public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName)
71    {
72      return getStateModelFactory(stateModelName,
73                                  HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
74    }
75  
76    public StateModelFactory<? extends StateModel> getStateModelFactory(String stateModelName,
77                                                                        String factoryName)
78    {
79      if (!_stateModelFactoryMap.containsKey(stateModelName))
80      {
81        return null;
82      }
83      return _stateModelFactoryMap.get(stateModelName).get(factoryName);
84    }
85  
86    @Override
87    public boolean registerStateModelFactory(String stateModelDef,
88                                             StateModelFactory<? extends StateModel> factory)
89    {
90      return registerStateModelFactory(stateModelDef,
91                                       factory,
92                                       HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
93    }
94  
95    @Override
96    public boolean registerStateModelFactory(String stateModelName,
97                                             StateModelFactory<? extends StateModel> factory,
98                                             String factoryName)
99    {
100     if (stateModelName == null || factory == null || factoryName == null)
101     {
102       throw new HelixException("stateModelDef|stateModelFactory|factoryName cannot be null");
103     }
104 
105     logger.info("Register state model factory for state model " + stateModelName
106         + " using factory name " + factoryName + " with " + factory);
107 
108     if (!_stateModelFactoryMap.containsKey(stateModelName))
109     {
110       _stateModelFactoryMap.put(stateModelName,
111                                 new ConcurrentHashMap<String, StateModelFactory<? extends StateModel>>());
112     }
113 
114     if (_stateModelFactoryMap.get(stateModelName).containsKey(factoryName))
115     {
116       logger.warn("stateModelFactory for " + stateModelName + " using factoryName "
117           + factoryName + " has already been registered.");
118       return false;
119     }
120 
121     _stateModelFactoryMap.get(stateModelName).put(factoryName, factory);
122     sendNopMessage();
123     return true;
124   }
125 
126   // TODO: duplicated code in DefaultMessagingService
127   private void sendNopMessage()
128   {
129     if (_manager.isConnected())
130     {
131       try
132       {
133         Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
134         nopMsg.setSrcName(_manager.getInstanceName());
135 
136         HelixDataAccessor accessor = _manager.getHelixDataAccessor();
137         Builder keyBuilder = accessor.keyBuilder();
138 
139         if (_manager.getInstanceType() == InstanceType.CONTROLLER
140             || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
141         {
142           nopMsg.setTgtName("Controller");
143           accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
144         }
145 
146         if (_manager.getInstanceType() == InstanceType.PARTICIPANT
147             || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
148         {
149           nopMsg.setTgtName(_manager.getInstanceName());
150           accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
151                                nopMsg);
152         }
153         logger.info("Send NO_OP message to " + nopMsg.getTgtName() + ", msgId: "
154             + nopMsg.getId());
155       }
156       catch (Exception e)
157       {
158         logger.error(e);
159       }
160     }
161   }
162 
163   @Override
164   public void reset()
165   {
166     for (Map<String, StateModelFactory<? extends StateModel>> ftyMap : _stateModelFactoryMap.values())
167     {
168       for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values())
169       {
170         Map<String, ? extends StateModel> modelMap = stateModelFactory.getStateModelMap();
171         if (modelMap == null || modelMap.isEmpty())
172         {
173           continue;
174         }
175 
176         for (String resourceKey : modelMap.keySet())
177         {
178           StateModel stateModel = modelMap.get(resourceKey);
179           stateModel.reset();
180           String initialState = _stateModelParser.getInitialState(stateModel.getClass());
181           stateModel.updateState(initialState);
182           // TODO probably should update the state on ZK. Shi confirm what needs
183           // to be done here.
184         }
185       }
186     }
187   }
188 
189   @Override
190   public MessageHandler createHandler(Message message, NotificationContext context)
191   {
192     String type = message.getMsgType();
193 
194     if (!type.equals(MessageType.STATE_TRANSITION.toString()))
195     {
196       throw new HelixException("Expect state-transition message type, but was " 
197     		  + message.getMsgType() + ", msgId: " + message.getMsgId());
198     }
199 
200     String partitionKey = message.getPartitionName();
201     String stateModelName = message.getStateModelDef();
202     String resourceName = message.getResourceName();
203     String sessionId = message.getTgtSessionId();
204     int bucketSize = message.getBucketSize();
205 
206     if (stateModelName == null)
207     {
208       logger.error("Fail to create msg-handler because message does not contain stateModelDef. msgId: " + message.getId());
209       return null;
210     }
211 
212     String factoryName = message.getStateModelFactoryName();
213     if (factoryName == null)
214     {
215       factoryName = HelixConstants.DEFAULT_STATE_MODEL_FACTORY;
216     }
217 
218     StateModelFactory<? extends StateModel> stateModelFactory =
219         getStateModelFactory(stateModelName, factoryName);
220     if (stateModelFactory == null)
221     {
222       logger.warn("Fail to create msg-handler because cannot find stateModelFactory for model: " + stateModelName
223           + " using factoryName: " + factoryName + " for resource: " + resourceName);
224       return null;
225     }
226 
227     // check if the state model definition exists and cache it
228     if (!_stateModelDefs.containsKey(stateModelName))
229     {
230       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
231       Builder keyBuilder = accessor.keyBuilder();
232       StateModelDefinition stateModelDef =
233           accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
234       if (stateModelDef == null)
235       {
236         throw new HelixException("fail to create msg-handler because stateModelDef for " + stateModelName
237             + " does NOT exist");
238       }
239       _stateModelDefs.put(stateModelName, stateModelDef);
240     }
241 
242     if (message.getBatchMessageMode() == false) {
243         // create currentStateDelta for this partition
244         String initState = _stateModelDefs.get(message.getStateModelDef()).getInitialState();
245         StateModel stateModel = stateModelFactory.getStateModel(partitionKey);
246         if (stateModel == null)
247         {
248           stateModel = stateModelFactory.createAndAddStateModel(partitionKey);
249           stateModel.updateState(initState);
250         }
251 
252         // TODO: move currentStateDelta to StateTransitionMsgHandler
253         CurrentState currentStateDelta = new CurrentState(resourceName);
254         currentStateDelta.setSessionId(sessionId);
255         currentStateDelta.setStateModelDefRef(stateModelName);
256         currentStateDelta.setStateModelFactoryName(factoryName);
257         currentStateDelta.setBucketSize(bucketSize);
258 
259         currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null)
260             ? initState : stateModel.getCurrentState());
261 
262         return new HelixStateTransitionHandler(stateModel,
263                                                message,
264                                                context,
265                                                currentStateDelta);
266     } else
267     {    	
268       BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName);
269       if (wrapper == null)
270       {
271         wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
272       }
273       
274     	// get executor-service for the message
275     	TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString());
276     	if (executor == null)
277     	{
278     		logger.error("fail to get executor-service for batch message: " + message.getId() 
279     				+ ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName());
280     		return null;
281     	}
282     	return new BatchMessageHandler(message, context, this, wrapper, executor);
283     }  
284   }
285 
286   @Override
287   public String getMessageType()
288   {
289     return MessageType.STATE_TRANSITION.toString();
290   }
291 
292   @Override
293   public boolean removeStateModelFactory(String stateModelDef,
294                                          StateModelFactory<? extends StateModel> factory)
295   {
296     throw new UnsupportedOperationException("Remove not yet supported");
297   }
298 
299   @Override
300   public boolean removeStateModelFactory(String stateModelDef,
301                                          StateModelFactory<? extends StateModel> factory,
302                                          String factoryName)
303   {
304     throw new UnsupportedOperationException("Remove not yet supported");
305   }
306 }