View Javadoc

1   package org.apache.helix.messaging.handling;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Timer;
27  import java.util.TimerTask;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ConcurrentHashMap;
30  
31  import org.apache.helix.HelixDataAccessor;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.InstanceType;
34  import org.apache.helix.NotificationContext;
35  import org.apache.helix.NotificationContext.MapKey;
36  import org.apache.helix.PropertyKey;
37  import org.apache.helix.PropertyKey.Builder;
38  import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
39  import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
40  import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
41  import org.apache.helix.model.CurrentState;
42  import org.apache.helix.model.Message;
43  import org.apache.helix.model.Message.Attributes;
44  import org.apache.helix.model.Message.MessageType;
45  import org.apache.helix.monitoring.StateTransitionContext;
46  import org.apache.helix.monitoring.StateTransitionDataPoint;
47  import org.apache.helix.util.StatusUpdateUtil;
48  import org.apache.log4j.Logger;
49  
50  
51  public class HelixTask implements MessageTask
52  {
53    private static Logger             logger     = Logger.getLogger(HelixTask.class);
54    private final Message             _message;
55    private final MessageHandler      _handler;
56    private final NotificationContext _notificationContext;
57    private final HelixManager        _manager;
58    StatusUpdateUtil                  _statusUpdateUtil;
59    HelixTaskExecutor                 _executor;
60    volatile boolean                  _isTimeout = false;
61  
62    public HelixTask(Message message,
63                     NotificationContext notificationContext,
64                     MessageHandler handler,
65                     HelixTaskExecutor executor)
66    {
67      this._notificationContext = notificationContext;
68      this._message = message;
69      this._handler = handler;
70      this._manager = notificationContext.getManager();
71      _statusUpdateUtil = new StatusUpdateUtil();
72      _executor = executor;
73    }
74  
75    @Override
76    public HelixTaskResult call()
77    {
78      HelixTaskResult taskResult = null;
79  
80      ErrorType type = null;
81      ErrorCode code = null;
82  
83      long start = System.currentTimeMillis();
84      logger.info("handling task: " + getTaskId() + " begin, at: " + start);
85      HelixDataAccessor accessor = _manager.getHelixDataAccessor();
86      _statusUpdateUtil.logInfo(_message,
87                                HelixTask.class,
88                                "Message handling task begin execute",
89                                accessor);
90      _message.setExecuteStartTimeStamp(new Date().getTime());
91  
92      // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
93      // partitionName -> csUpdate
94      if (_message.getBatchMessageMode() == true) {
95    	  _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(), 
96    			  new ConcurrentHashMap<String, CurrentStateUpdate>());
97      }
98  
99      // Handle the message
100     try
101     {
102       taskResult = _handler.handleMessage();
103     }
104     catch (InterruptedException e)
105     {
106       taskResult = new HelixTaskResult();
107       taskResult.setException(e);
108       taskResult.setInterrupted(true);
109 
110       _statusUpdateUtil.logError(_message,
111                                  HelixTask.class,
112                                  e,
113                                  "State transition interrupted, timeout:" + _isTimeout,
114                                  accessor);
115       logger.info("Message " + _message.getMsgId() + " is interrupted");
116     }
117     catch (Exception e)
118     {
119       taskResult = new HelixTaskResult();
120       taskResult.setException(e);
121       taskResult.setMessage(e.getMessage());
122         
123       String errorMessage =
124           "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
125               + " type: " + _message.getMsgType();
126       logger.error(errorMessage, e);
127       _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
128     }
129 
130     // cancel timeout task
131     _executor.cancelTimeoutTask(this);
132     
133     Exception exception = null;
134     try
135     {
136       if (taskResult.isSuccess())
137       {
138         _statusUpdateUtil.logInfo(_message,
139                                 _handler.getClass(),
140                                 "Message handling task completed successfully",
141                                 accessor);
142         logger.info("Message " + _message.getMsgId() + " completed.");
143       }
144       else {
145     	  type = ErrorType.INTERNAL;
146     	  
147     	  if (taskResult.isInterrupted())
148           {
149     		  logger.info("Message " + _message.getMsgId() + " is interrupted");
150     		  code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
151     		  if (_isTimeout)
152     		  {
153     			  int retryCount = _message.getRetryCount();
154     			  logger.info("Message timeout, retry count: " + retryCount + " msgId:"
155     					  + _message.getMsgId());
156     			  _statusUpdateUtil.logInfo(_message,
157                                   _handler.getClass(),
158                                   "Message handling task timeout, retryCount:"
159                                       + retryCount,
160                                   accessor);
161     			  // Notify the handler that timeout happens, and the number of retries left
162     			  // In case timeout happens (time out and also interrupted)
163     			  // we should retry the execution of the message by re-schedule it in
164     			  if (retryCount > 0)
165     			  {
166     				  _message.setRetryCount(retryCount - 1);
167                       HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
168                       _executor.scheduleTask(task);
169                       return taskResult;
170     			  }
171     		  }
172           }
173     	  else  // logging for errors
174     	  {
175     		  code = ErrorCode.ERROR;
176     		  String errorMsg =
177     			  "Message execution failed. msgId: " + getTaskId()
178     			  + ", errorMsg: " + taskResult.getMessage();
179     		  logger.error(errorMsg);
180     		  _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
181     	  }
182       }
183       
184       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
185     	  // System.err.println("\t[dbg]remove msg: " + getTaskId());
186           removeMessageFromZk(accessor, _message);
187           reportMessageStat(_manager, _message, taskResult);
188           sendReply(accessor, _message, taskResult);
189           _executor.finishTask(this);
190       }
191     }
192     catch (Exception e)
193     {
194       exception = e;
195       type = ErrorType.FRAMEWORK;
196       code = ErrorCode.ERROR;
197         
198       String errorMessage =
199           "Exception after executing a message, msgId: " + _message.getMsgId() + e;
200       logger.error(errorMessage, e);
201       _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
202     }
203     finally
204     {
205       long end = System.currentTimeMillis();
206       logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
207           + taskResult.isSuccess() + ", at: " + end + ", took:" + (end - start));
208 
209       // Notify the handler about any error happened in the handling procedure, so that
210       // the handler have chance to finally cleanup
211       if (type == ErrorType.INTERNAL)
212       {
213         _handler.onError(taskResult.getException(), code, type);
214       } else if (type == ErrorType.FRAMEWORK) {
215     	  _handler.onError(exception, code, type);
216       }
217     }
218     
219     return taskResult;
220   }
221 
222   private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
223   {
224     Builder keyBuilder = accessor.keyBuilder();
225     if (message.getTgtName().equalsIgnoreCase("controller"))
226     {
227       // TODO: removeProperty returns boolean
228       accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
229     }
230     else
231     {
232       accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
233                                                  message.getMsgId()));
234     }
235   }
236 
237   private void sendReply(HelixDataAccessor accessor,
238                          Message message,
239                          HelixTaskResult taskResult)
240   {
241     if (_message.getCorrelationId() != null
242         && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
243     {
244       logger.info("Sending reply for message " + message.getCorrelationId());
245       _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
246 
247       taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSuccess());
248       taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
249       if (!taskResult.isSuccess())
250       {
251         taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
252       }
253       Message replyMessage =
254           Message.createReplyMessage(_message,
255                                      _manager.getInstanceName(),
256                                      taskResult.getTaskResultMap());
257       replyMessage.setSrcInstanceType(_manager.getInstanceType());
258 
259       if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
260       {
261         Builder keyBuilder = accessor.keyBuilder();
262         accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
263                                                 replyMessage.getMsgId()),
264                              replyMessage);
265       }
266       else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
267       {
268         Builder keyBuilder = accessor.keyBuilder();
269         accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
270                              replyMessage);
271       }
272       _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
273           + replyMessage.getTgtName(), accessor);
274     }
275   }
276 
277   private void reportMessageStat(HelixManager manager,
278                                  Message message,
279                                  HelixTaskResult taskResult)
280   {
281     // report stat
282     if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
283     {
284       return;
285     }
286     long now = new Date().getTime();
287     long msgReadTime = message.getReadTimeStamp();
288     long msgExecutionStartTime = message.getExecuteStartTimeStamp();
289     if (msgReadTime != 0 && msgExecutionStartTime != 0)
290     {
291       long totalDelay = now - msgReadTime;
292       long executionDelay = now - msgExecutionStartTime;
293       if (totalDelay > 0 && executionDelay > 0)
294       {
295         String fromState = message.getFromState();
296         String toState = message.getToState();
297         String transition = fromState + "--" + toState;
298 
299         StateTransitionContext cxt =
300             new StateTransitionContext(manager.getClusterName(),
301                                        manager.getInstanceName(),
302                                        message.getResourceName(),
303                                        transition);
304 
305         StateTransitionDataPoint data =
306             new StateTransitionDataPoint(totalDelay,
307                                          executionDelay,
308                                          taskResult.isSuccess());
309         _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
310       }
311     }
312     else
313     {
314       logger.warn("message read time and start execution time not recorded.");
315     }
316   }
317 
318   @Override
319   public String getTaskId()
320   {
321 	  return _message.getId();
322   }
323   
324   @Override
325   public Message getMessage() {
326 		return _message;
327   }
328 
329   @Override
330   public 	NotificationContext getNotificationContext()
331   {
332 	return _notificationContext;
333   }
334 
335   @Override
336   public void onTimeout() {
337 	_isTimeout = true;
338 	_handler.onTimeout();
339   }
340 };