1 package org.apache.helix.messaging.handling;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30
31 import org.apache.helix.HelixDataAccessor;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.InstanceType;
34 import org.apache.helix.NotificationContext;
35 import org.apache.helix.NotificationContext.MapKey;
36 import org.apache.helix.PropertyKey;
37 import org.apache.helix.PropertyKey.Builder;
38 import org.apache.helix.messaging.handling.GroupMessageHandler.GroupMessageInfo;
39 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
40 import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
41 import org.apache.helix.model.CurrentState;
42 import org.apache.helix.model.Message;
43 import org.apache.helix.model.Message.Attributes;
44 import org.apache.helix.model.Message.MessageType;
45 import org.apache.helix.monitoring.StateTransitionContext;
46 import org.apache.helix.monitoring.StateTransitionDataPoint;
47 import org.apache.helix.util.StatusUpdateUtil;
48 import org.apache.log4j.Logger;
49
50
51 public class HelixTask implements MessageTask
52 {
53 private static Logger logger = Logger.getLogger(HelixTask.class);
54 private final Message _message;
55 private final MessageHandler _handler;
56 private final NotificationContext _notificationContext;
57 private final HelixManager _manager;
58 StatusUpdateUtil _statusUpdateUtil;
59 HelixTaskExecutor _executor;
60 volatile boolean _isTimeout = false;
61
62 public HelixTask(Message message,
63 NotificationContext notificationContext,
64 MessageHandler handler,
65 HelixTaskExecutor executor)
66 {
67 this._notificationContext = notificationContext;
68 this._message = message;
69 this._handler = handler;
70 this._manager = notificationContext.getManager();
71 _statusUpdateUtil = new StatusUpdateUtil();
72 _executor = executor;
73 }
74
75 @Override
76 public HelixTaskResult call()
77 {
78 HelixTaskResult taskResult = null;
79
80 ErrorType type = null;
81 ErrorCode code = null;
82
83 long start = System.currentTimeMillis();
84 logger.info("handling task: " + getTaskId() + " begin, at: " + start);
85 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
86 _statusUpdateUtil.logInfo(_message,
87 HelixTask.class,
88 "Message handling task begin execute",
89 accessor);
90 _message.setExecuteStartTimeStamp(new Date().getTime());
91
92
93
94 if (_message.getBatchMessageMode() == true) {
95 _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
96 new ConcurrentHashMap<String, CurrentStateUpdate>());
97 }
98
99
100 try
101 {
102 taskResult = _handler.handleMessage();
103 }
104 catch (InterruptedException e)
105 {
106 taskResult = new HelixTaskResult();
107 taskResult.setException(e);
108 taskResult.setInterrupted(true);
109
110 _statusUpdateUtil.logError(_message,
111 HelixTask.class,
112 e,
113 "State transition interrupted, timeout:" + _isTimeout,
114 accessor);
115 logger.info("Message " + _message.getMsgId() + " is interrupted");
116 }
117 catch (Exception e)
118 {
119 taskResult = new HelixTaskResult();
120 taskResult.setException(e);
121 taskResult.setMessage(e.getMessage());
122
123 String errorMessage =
124 "Exception while executing a message. " + e + " msgId: " + _message.getMsgId()
125 + " type: " + _message.getMsgType();
126 logger.error(errorMessage, e);
127 _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, accessor);
128 }
129
130
131 _executor.cancelTimeoutTask(this);
132
133 Exception exception = null;
134 try
135 {
136 if (taskResult.isSuccess())
137 {
138 _statusUpdateUtil.logInfo(_message,
139 _handler.getClass(),
140 "Message handling task completed successfully",
141 accessor);
142 logger.info("Message " + _message.getMsgId() + " completed.");
143 }
144 else {
145 type = ErrorType.INTERNAL;
146
147 if (taskResult.isInterrupted())
148 {
149 logger.info("Message " + _message.getMsgId() + " is interrupted");
150 code = _isTimeout ? ErrorCode.TIMEOUT : ErrorCode.CANCEL;
151 if (_isTimeout)
152 {
153 int retryCount = _message.getRetryCount();
154 logger.info("Message timeout, retry count: " + retryCount + " msgId:"
155 + _message.getMsgId());
156 _statusUpdateUtil.logInfo(_message,
157 _handler.getClass(),
158 "Message handling task timeout, retryCount:"
159 + retryCount,
160 accessor);
161
162
163
164 if (retryCount > 0)
165 {
166 _message.setRetryCount(retryCount - 1);
167 HelixTask task = new HelixTask(_message, _notificationContext, _handler, _executor);
168 _executor.scheduleTask(task);
169 return taskResult;
170 }
171 }
172 }
173 else
174 {
175 code = ErrorCode.ERROR;
176 String errorMsg =
177 "Message execution failed. msgId: " + getTaskId()
178 + ", errorMsg: " + taskResult.getMessage();
179 logger.error(errorMsg);
180 _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, accessor);
181 }
182 }
183
184 if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
185
186 removeMessageFromZk(accessor, _message);
187 reportMessageStat(_manager, _message, taskResult);
188 sendReply(accessor, _message, taskResult);
189 _executor.finishTask(this);
190 }
191 }
192 catch (Exception e)
193 {
194 exception = e;
195 type = ErrorType.FRAMEWORK;
196 code = ErrorCode.ERROR;
197
198 String errorMessage =
199 "Exception after executing a message, msgId: " + _message.getMsgId() + e;
200 logger.error(errorMessage, e);
201 _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, accessor);
202 }
203 finally
204 {
205 long end = System.currentTimeMillis();
206 logger.info("msg: " + _message.getMsgId() + " handling task completed, results:"
207 + taskResult.isSuccess() + ", at: " + end + ", took:" + (end - start));
208
209
210
211 if (type == ErrorType.INTERNAL)
212 {
213 _handler.onError(taskResult.getException(), code, type);
214 } else if (type == ErrorType.FRAMEWORK) {
215 _handler.onError(exception, code, type);
216 }
217 }
218
219 return taskResult;
220 }
221
222 private void removeMessageFromZk(HelixDataAccessor accessor, Message message)
223 {
224 Builder keyBuilder = accessor.keyBuilder();
225 if (message.getTgtName().equalsIgnoreCase("controller"))
226 {
227
228 accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
229 }
230 else
231 {
232 accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(),
233 message.getMsgId()));
234 }
235 }
236
237 private void sendReply(HelixDataAccessor accessor,
238 Message message,
239 HelixTaskResult taskResult)
240 {
241 if (_message.getCorrelationId() != null
242 && !message.getMsgType().equals(MessageType.TASK_REPLY.toString()))
243 {
244 logger.info("Sending reply for message " + message.getCorrelationId());
245 _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", accessor);
246
247 taskResult.getTaskResultMap().put("SUCCESS", "" + taskResult.isSuccess());
248 taskResult.getTaskResultMap().put("INTERRUPTED", "" + taskResult.isInterrupted());
249 if (!taskResult.isSuccess())
250 {
251 taskResult.getTaskResultMap().put("ERRORINFO", taskResult.getMessage());
252 }
253 Message replyMessage =
254 Message.createReplyMessage(_message,
255 _manager.getInstanceName(),
256 taskResult.getTaskResultMap());
257 replyMessage.setSrcInstanceType(_manager.getInstanceType());
258
259 if (message.getSrcInstanceType() == InstanceType.PARTICIPANT)
260 {
261 Builder keyBuilder = accessor.keyBuilder();
262 accessor.setProperty(keyBuilder.message(message.getMsgSrc(),
263 replyMessage.getMsgId()),
264 replyMessage);
265 }
266 else if (message.getSrcInstanceType() == InstanceType.CONTROLLER)
267 {
268 Builder keyBuilder = accessor.keyBuilder();
269 accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()),
270 replyMessage);
271 }
272 _statusUpdateUtil.logInfo(message, HelixTask.class, "1 msg replied to "
273 + replyMessage.getTgtName(), accessor);
274 }
275 }
276
277 private void reportMessageStat(HelixManager manager,
278 Message message,
279 HelixTaskResult taskResult)
280 {
281
282 if (!message.getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
283 {
284 return;
285 }
286 long now = new Date().getTime();
287 long msgReadTime = message.getReadTimeStamp();
288 long msgExecutionStartTime = message.getExecuteStartTimeStamp();
289 if (msgReadTime != 0 && msgExecutionStartTime != 0)
290 {
291 long totalDelay = now - msgReadTime;
292 long executionDelay = now - msgExecutionStartTime;
293 if (totalDelay > 0 && executionDelay > 0)
294 {
295 String fromState = message.getFromState();
296 String toState = message.getToState();
297 String transition = fromState + "--" + toState;
298
299 StateTransitionContext cxt =
300 new StateTransitionContext(manager.getClusterName(),
301 manager.getInstanceName(),
302 message.getResourceName(),
303 transition);
304
305 StateTransitionDataPoint data =
306 new StateTransitionDataPoint(totalDelay,
307 executionDelay,
308 taskResult.isSuccess());
309 _executor.getParticipantMonitor().reportTransitionStat(cxt, data);
310 }
311 }
312 else
313 {
314 logger.warn("message read time and start execution time not recorded.");
315 }
316 }
317
318 @Override
319 public String getTaskId()
320 {
321 return _message.getId();
322 }
323
324 @Override
325 public Message getMessage() {
326 return _message;
327 }
328
329 @Override
330 public NotificationContext getNotificationContext()
331 {
332 return _notificationContext;
333 }
334
335 @Override
336 public void onTimeout() {
337 _isTimeout = true;
338 _handler.onTimeout();
339 }
340 };