1 package org.apache.helix.model;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.Date;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31 import org.apache.helix.HelixException;
32 import org.apache.helix.HelixProperty;
33 import org.apache.helix.InstanceType;
34 import org.apache.helix.PropertyKey;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.PropertyKey.Builder;
37
38
39
40
41
42
43 public class Message extends HelixProperty
44 {
45 public enum MessageType
46 {
47 STATE_TRANSITION,
48 SCHEDULER_MSG,
49 USER_DEFINE_MSG,
50 CONTROLLER_MSG,
51 TASK_REPLY,
52 NO_OP,
53 PARTICIPANT_ERROR_REPORT
54 };
55
56 public enum Attributes
57 {
58 MSG_ID,
59 SRC_SESSION_ID,
60 TGT_SESSION_ID,
61 SRC_NAME,
62 TGT_NAME,
63 SRC_INSTANCE_TYPE,
64 MSG_STATE,
65 PARTITION_NAME,
66 RESOURCE_NAME,
67 FROM_STATE,
68 TO_STATE,
69 STATE_MODEL_DEF,
70 CREATE_TIMESTAMP,
71 READ_TIMESTAMP,
72 EXECUTE_START_TIMESTAMP,
73 MSG_TYPE,
74 MSG_SUBTYPE,
75 CORRELATION_ID,
76 MESSAGE_RESULT,
77 EXE_SESSION_ID,
78 TIMEOUT,
79 RETRY_COUNT,
80 STATE_MODEL_FACTORY_NAME,
81 BUCKET_SIZE,
82 PARENT_MSG_ID,
83 INNER_MESSAGE
84 }
85
86 public enum MessageState
87 {
88 NEW,
89 READ,
90 UNPROCESSABLE
91 }
92
93 public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>(){
94 @Override
95 public int compare(Message m1, Message m2)
96 {
97
98
99 return (int) (m1.getCreateTimeStamp() - m2.getCreateTimeStamp());
100 }
101 };
102
103
104
105 public Message(MessageType type, String msgId)
106 {
107 this(type.toString(), msgId);
108 }
109
110 public Message(String type, String msgId)
111 {
112 super(new ZNRecord(msgId));
113 _record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
114 setMsgId(msgId);
115 setMsgState(MessageState.NEW);
116 _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
117 "" + new Date().getTime());
118 }
119
120 public Message(ZNRecord record)
121 {
122 super(record);
123 if (getMsgState() == null)
124 {
125 setMsgState(MessageState.NEW);
126 }
127 if (getCreateTimeStamp() == 0)
128 {
129 _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
130 "" + new Date().getTime());
131 }
132 }
133
134 public void setCreateTimeStamp(long timestamp)
135 {
136 _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), "" + timestamp);
137 }
138
139 public Message(ZNRecord record, String id)
140 {
141 super(new ZNRecord(record, id));
142 setMsgId(id);
143 }
144
145 public void setMsgSubType(String subType)
146 {
147 _record.setSimpleField(Attributes.MSG_SUBTYPE.toString(), subType);
148 }
149
150 public String getMsgSubType()
151 {
152 return _record.getSimpleField(Attributes.MSG_SUBTYPE.toString());
153 }
154
155 void setMsgType(MessageType type)
156 {
157 _record.setSimpleField(Attributes.MSG_TYPE.toString(), type.toString());
158 }
159
160 public String getMsgType()
161 {
162 return _record.getSimpleField(Attributes.MSG_TYPE.toString());
163 }
164
165 public String getTgtSessionId()
166 {
167 return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString());
168 }
169
170 public void setTgtSessionId(String tgtSessionId)
171 {
172 _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId);
173 }
174
175 public String getSrcSessionId()
176 {
177 return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString());
178 }
179
180 public void setSrcSessionId(String srcSessionId)
181 {
182 _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
183 }
184
185 public String getExecutionSessionId()
186 {
187 return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString());
188 }
189
190 public void setExecuteSessionId(String exeSessionId)
191 {
192 _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId);
193 }
194
195 public String getMsgSrc()
196 {
197 return _record.getSimpleField(Attributes.SRC_NAME.toString());
198 }
199
200 public void setSrcInstanceType(InstanceType type)
201 {
202 _record.setSimpleField(Attributes.SRC_INSTANCE_TYPE.toString(), type.toString());
203 }
204
205 public InstanceType getSrcInstanceType()
206 {
207 if (_record.getSimpleFields().containsKey(Attributes.SRC_INSTANCE_TYPE.toString()))
208 {
209 return InstanceType.valueOf(_record.getSimpleField(Attributes.SRC_INSTANCE_TYPE.toString()));
210 }
211 return InstanceType.PARTICIPANT;
212 }
213
214 public void setSrcName(String msgSrc)
215 {
216 _record.setSimpleField(Attributes.SRC_NAME.toString(), msgSrc);
217 }
218
219 public String getTgtName()
220 {
221 return _record.getSimpleField(Attributes.TGT_NAME.toString());
222 }
223
224 public void setMsgState(MessageState msgState)
225 {
226 _record.setSimpleField(Attributes.MSG_STATE.toString(), msgState.toString()
227 .toLowerCase());
228 }
229
230 public MessageState getMsgState()
231 {
232
233 return MessageState.valueOf(_record.getSimpleField(Attributes.MSG_STATE.toString())
234 .toUpperCase());
235 }
236
237 public void setPartitionName(String partitionName)
238 {
239 _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName);
240 }
241
242 public String getMsgId()
243 {
244 return _record.getSimpleField(Attributes.MSG_ID.toString());
245 }
246
247 public void setMsgId(String msgId)
248 {
249 _record.setSimpleField(Attributes.MSG_ID.toString(), msgId);
250 }
251
252 public void setFromState(String state)
253 {
254 _record.setSimpleField(Attributes.FROM_STATE.toString(), state);
255 }
256
257 public String getFromState()
258 {
259 return _record.getSimpleField(Attributes.FROM_STATE.toString());
260 }
261
262 public void setToState(String state)
263 {
264 _record.setSimpleField(Attributes.TO_STATE.toString(), state);
265 }
266
267 public String getToState()
268 {
269 return _record.getSimpleField(Attributes.TO_STATE.toString());
270 }
271
272 public void setTgtName(String msgTgt)
273 {
274 _record.setSimpleField(Attributes.TGT_NAME.toString(), msgTgt);
275 }
276
277 public Boolean getDebug()
278 {
279 return false;
280 }
281
282 public Integer getGeneration()
283 {
284 return 1;
285 }
286
287 public void setResourceName(String resourceName)
288 {
289 _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
290 }
291
292 public String getResourceName()
293 {
294 return _record.getSimpleField(Attributes.RESOURCE_NAME.toString());
295 }
296
297 public String getPartitionName()
298 {
299 return _record.getSimpleField(Attributes.PARTITION_NAME.toString());
300 }
301
302 public String getStateModelDef()
303 {
304 return _record.getSimpleField(Attributes.STATE_MODEL_DEF.toString());
305 }
306
307 public void setStateModelDef(String stateModelDefName)
308 {
309 _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefName);
310 }
311
312 public void setReadTimeStamp(long time)
313 {
314 _record.setSimpleField(Attributes.READ_TIMESTAMP.toString(), "" + time);
315 }
316
317 public void setExecuteStartTimeStamp(long time)
318 {
319 _record.setSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString(), "" + time);
320 }
321
322 public long getReadTimeStamp()
323 {
324 String timestamp = _record.getSimpleField(Attributes.READ_TIMESTAMP.toString());
325 if (timestamp == null)
326 {
327 return 0;
328 }
329 try
330 {
331 return Long.parseLong(timestamp);
332 }
333 catch (Exception e)
334 {
335 return 0;
336 }
337
338 }
339
340 public long getExecuteStartTimeStamp()
341 {
342 String timestamp =
343 _record.getSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString());
344 if (timestamp == null)
345 {
346 return 0;
347 }
348 try
349 {
350 return Long.parseLong(timestamp);
351 }
352 catch (Exception e)
353 {
354 return 0;
355 }
356 }
357
358 public long getCreateTimeStamp()
359 {
360 if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
361 {
362 return 0;
363 }
364 try
365 {
366 return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
367 }
368 catch (Exception e)
369 {
370 return 0;
371 }
372 }
373
374 public void setCorrelationId(String correlationId)
375 {
376 _record.setSimpleField(Attributes.CORRELATION_ID.toString(), correlationId);
377 }
378
379 public String getCorrelationId()
380 {
381 return _record.getSimpleField(Attributes.CORRELATION_ID.toString());
382 }
383
384 public int getExecutionTimeout()
385 {
386 if (!_record.getSimpleFields().containsKey(Attributes.TIMEOUT.toString()))
387 {
388 return -1;
389 }
390 try
391 {
392 return Integer.parseInt(_record.getSimpleField(Attributes.TIMEOUT.toString()));
393 }
394 catch (Exception e)
395 {
396 }
397 return -1;
398 }
399
400 public void setExecutionTimeout(int timeout)
401 {
402 _record.setSimpleField(Attributes.TIMEOUT.toString(), "" + timeout);
403 }
404
405 public void setRetryCount(int retryCount)
406 {
407 _record.setSimpleField(Attributes.RETRY_COUNT.toString(), "" + retryCount);
408 }
409
410 public int getRetryCount()
411 {
412 try
413 {
414 return Integer.parseInt(_record.getSimpleField(Attributes.RETRY_COUNT.toString()));
415 }
416 catch (Exception e)
417 {
418 }
419
420 return 0;
421 }
422
423 public Map<String, String> getResultMap()
424 {
425 return _record.getMapField(Attributes.MESSAGE_RESULT.toString());
426 }
427
428 public void setResultMap(Map<String, String> resultMap)
429 {
430 _record.setMapField(Attributes.MESSAGE_RESULT.toString(), resultMap);
431 }
432
433 public String getStateModelFactoryName()
434 {
435 return _record.getSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString());
436 }
437
438 public void setStateModelFactoryName(String factoryName)
439 {
440 _record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
441 }
442
443
444 @Override
445 public int getBucketSize()
446 {
447 String bucketSizeStr = _record.getSimpleField(Attributes.BUCKET_SIZE.toString());
448 int bucketSize = 0;
449 if (bucketSizeStr != null)
450 {
451 try
452 {
453 bucketSize = Integer.parseInt(bucketSizeStr);
454 }
455 catch (NumberFormatException e)
456 {
457
458 }
459 }
460 return bucketSize;
461 }
462
463 @Override
464 public void setBucketSize(int bucketSize)
465 {
466 if (bucketSize > 0)
467 {
468 _record.setSimpleField(Attributes.BUCKET_SIZE.toString(), "" + bucketSize);
469 }
470 }
471
472 public void setAttribute(Attributes attr, String val)
473 {
474 _record.setSimpleField(attr.toString(), val);
475 }
476
477 public String getAttribute(Attributes attr)
478 {
479 return _record.getSimpleField(attr.toString());
480 }
481
482 public static Message createReplyMessage(Message srcMessage,
483 String instanceName,
484 Map<String, String> taskResultMap)
485 {
486 if (srcMessage.getCorrelationId() == null)
487 {
488 throw new HelixException("Message " + srcMessage.getMsgId()
489 + " does not contain correlation id");
490 }
491 Message replyMessage =
492 new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
493 replyMessage.setCorrelationId(srcMessage.getCorrelationId());
494 replyMessage.setResultMap(taskResultMap);
495 replyMessage.setTgtSessionId("*");
496 replyMessage.setMsgState(MessageState.NEW);
497 replyMessage.setSrcName(instanceName);
498 if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER)
499 {
500 replyMessage.setTgtName("Controller");
501 }
502 else
503 {
504 replyMessage.setTgtName(srcMessage.getMsgSrc());
505 }
506 return replyMessage;
507 }
508
509 public void addPartitionName(String partitionName)
510 {
511 if (_record.getListField(Attributes.PARTITION_NAME.toString()) == null)
512 {
513 _record.setListField(Attributes.PARTITION_NAME.toString(), new ArrayList<String>());
514 }
515
516 List<String> partitionNames = _record.getListField(Attributes.PARTITION_NAME.toString());
517 if (!partitionNames.contains(partitionName))
518 {
519 partitionNames.add(partitionName);
520 }
521 }
522
523 public List<String> getPartitionNames()
524 {
525 List<String> partitionNames =
526 _record.getListField(Attributes.PARTITION_NAME.toString());
527 if (partitionNames == null)
528 {
529 return Collections.emptyList();
530 }
531
532 return partitionNames;
533 }
534
535
536
537
538
539
540
541
542
543
544
545 public boolean isControlerMsg()
546 {
547 return getTgtName().equalsIgnoreCase("controller");
548 }
549
550 public PropertyKey getKey(Builder keyBuilder, String instanceName)
551 {
552 if (isControlerMsg())
553 {
554 return keyBuilder.controllerMessage(getId());
555 }
556 else
557 {
558 return keyBuilder.message(instanceName, getId());
559 }
560 }
561
562 private boolean isNullOrEmpty(String data)
563 {
564 return data == null || data.length() == 0 || data.trim().length() == 0;
565 }
566
567 @Override
568 public boolean isValid()
569 {
570
571
572
573 if (getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
574 {
575 boolean isNotValid =
576 isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName())
577 || isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef())
578 || isNullOrEmpty(getToState()) || isNullOrEmpty(getStateModelFactoryName())
579 || isNullOrEmpty(getFromState());
580
581 return !isNotValid;
582 }
583
584 return true;
585 }
586 }