View Javadoc

1   package org.apache.helix.model;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.Date;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.UUID;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  import org.apache.helix.HelixException;
32  import org.apache.helix.HelixProperty;
33  import org.apache.helix.InstanceType;
34  import org.apache.helix.PropertyKey;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.PropertyKey.Builder;
37  
38  
39  /**
40   * 
41   */
42  
43  public class Message extends HelixProperty
44  {
45    public enum MessageType
46    {
47      STATE_TRANSITION,
48      SCHEDULER_MSG,
49      USER_DEFINE_MSG,
50      CONTROLLER_MSG,
51      TASK_REPLY,
52      NO_OP,
53      PARTICIPANT_ERROR_REPORT
54    };
55  
56    public enum Attributes
57    {
58      MSG_ID,
59      SRC_SESSION_ID,
60      TGT_SESSION_ID,
61      SRC_NAME,
62      TGT_NAME,
63      SRC_INSTANCE_TYPE,
64      MSG_STATE,
65      PARTITION_NAME,
66      RESOURCE_NAME,
67      FROM_STATE,
68      TO_STATE,
69      STATE_MODEL_DEF,
70      CREATE_TIMESTAMP,
71      READ_TIMESTAMP,
72      EXECUTE_START_TIMESTAMP,
73      MSG_TYPE,
74      MSG_SUBTYPE,
75      CORRELATION_ID,
76      MESSAGE_RESULT,
77      EXE_SESSION_ID,
78      TIMEOUT,
79      RETRY_COUNT,
80      STATE_MODEL_FACTORY_NAME,
81      BUCKET_SIZE,
82      PARENT_MSG_ID,   // used for group message mode
83      INNER_MESSAGE
84    }
85  
86    public enum MessageState
87    {
88      NEW, 
89      READ, // not used
90      UNPROCESSABLE // get exception when create handler
91    }
92  
93    public static final Comparator<Message> CREATE_TIME_COMPARATOR = new Comparator<Message>(){
94      @Override
95      public int compare(Message m1, Message m2)
96      {
97  //      long t1 = m1.getCreateTimeStamp();
98  //      long t2 = m2.getCreateTimeStamp();
99        return (int) (m1.getCreateTimeStamp() - m2.getCreateTimeStamp());
100     }
101   };
102   
103   // AtomicInteger _groupMsgCountDown = new AtomicInteger(1);
104   
105   public Message(MessageType type, String msgId)
106   {
107     this(type.toString(), msgId);
108   }
109 
110   public Message(String type, String msgId)
111   {
112     super(new ZNRecord(msgId));
113     _record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
114     setMsgId(msgId);
115     setMsgState(MessageState.NEW);
116     _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
117                            "" + new Date().getTime());
118   }
119 
120   public Message(ZNRecord record)
121   {
122     super(record);
123     if (getMsgState() == null)
124     {
125       setMsgState(MessageState.NEW);
126     }
127     if (getCreateTimeStamp() == 0)
128     {
129       _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(),
130                              "" + new Date().getTime());
131     }
132   }
133 
134   public void setCreateTimeStamp(long timestamp)
135   {
136     _record.setSimpleField(Attributes.CREATE_TIMESTAMP.toString(), "" + timestamp);
137   }
138 
139   public Message(ZNRecord record, String id)
140   {
141     super(new ZNRecord(record, id));
142     setMsgId(id);
143   }
144 
145   public void setMsgSubType(String subType)
146   {
147     _record.setSimpleField(Attributes.MSG_SUBTYPE.toString(), subType);
148   }
149 
150   public String getMsgSubType()
151   {
152     return _record.getSimpleField(Attributes.MSG_SUBTYPE.toString());
153   }
154 
155   void setMsgType(MessageType type)
156   {
157     _record.setSimpleField(Attributes.MSG_TYPE.toString(), type.toString());
158   }
159 
160   public String getMsgType()
161   {
162     return _record.getSimpleField(Attributes.MSG_TYPE.toString());
163   }
164 
165   public String getTgtSessionId()
166   {
167     return _record.getSimpleField(Attributes.TGT_SESSION_ID.toString());
168   }
169 
170   public void setTgtSessionId(String tgtSessionId)
171   {
172     _record.setSimpleField(Attributes.TGT_SESSION_ID.toString(), tgtSessionId);
173   }
174 
175   public String getSrcSessionId()
176   {
177     return _record.getSimpleField(Attributes.SRC_SESSION_ID.toString());
178   }
179 
180   public void setSrcSessionId(String srcSessionId)
181   {
182     _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
183   }
184 
185   public String getExecutionSessionId()
186   {
187     return _record.getSimpleField(Attributes.EXE_SESSION_ID.toString());
188   }
189 
190   public void setExecuteSessionId(String exeSessionId)
191   {
192     _record.setSimpleField(Attributes.EXE_SESSION_ID.toString(), exeSessionId);
193   }
194 
195   public String getMsgSrc()
196   {
197     return _record.getSimpleField(Attributes.SRC_NAME.toString());
198   }
199 
200   public void setSrcInstanceType(InstanceType type)
201   {
202     _record.setSimpleField(Attributes.SRC_INSTANCE_TYPE.toString(), type.toString());
203   }
204 
205   public InstanceType getSrcInstanceType()
206   {
207     if (_record.getSimpleFields().containsKey(Attributes.SRC_INSTANCE_TYPE.toString()))
208     {
209       return InstanceType.valueOf(_record.getSimpleField(Attributes.SRC_INSTANCE_TYPE.toString()));
210     }
211     return InstanceType.PARTICIPANT;
212   }
213 
214   public void setSrcName(String msgSrc)
215   {
216     _record.setSimpleField(Attributes.SRC_NAME.toString(), msgSrc);
217   }
218 
219   public String getTgtName()
220   {
221     return _record.getSimpleField(Attributes.TGT_NAME.toString());
222   }
223 
224   public void setMsgState(MessageState msgState)
225   { // HACK: The "tolowerCase()" call is to make the change backward compatible
226     _record.setSimpleField(Attributes.MSG_STATE.toString(), msgState.toString()
227                                                                     .toLowerCase());
228   }
229 
230   public MessageState getMsgState()
231   {
232     // HACK: The "toUpperCase()" call is to make the change backward compatible
233     return MessageState.valueOf(_record.getSimpleField(Attributes.MSG_STATE.toString())
234                                        .toUpperCase());
235   }
236 
237   public void setPartitionName(String partitionName)
238   {
239     _record.setSimpleField(Attributes.PARTITION_NAME.toString(), partitionName);
240   }
241 
242   public String getMsgId()
243   {
244     return _record.getSimpleField(Attributes.MSG_ID.toString());
245   }
246 
247   public void setMsgId(String msgId)
248   {
249     _record.setSimpleField(Attributes.MSG_ID.toString(), msgId);
250   }
251 
252   public void setFromState(String state)
253   {
254     _record.setSimpleField(Attributes.FROM_STATE.toString(), state);
255   }
256 
257   public String getFromState()
258   {
259     return _record.getSimpleField(Attributes.FROM_STATE.toString());
260   }
261 
262   public void setToState(String state)
263   {
264     _record.setSimpleField(Attributes.TO_STATE.toString(), state);
265   }
266 
267   public String getToState()
268   {
269     return _record.getSimpleField(Attributes.TO_STATE.toString());
270   }
271 
272   public void setTgtName(String msgTgt)
273   {
274     _record.setSimpleField(Attributes.TGT_NAME.toString(), msgTgt);
275   }
276 
277   public Boolean getDebug()
278   {
279     return false;
280   }
281 
282   public Integer getGeneration()
283   {
284     return 1;
285   }
286 
287   public void setResourceName(String resourceName)
288   {
289     _record.setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
290   }
291 
292   public String getResourceName()
293   {
294     return _record.getSimpleField(Attributes.RESOURCE_NAME.toString());
295   }
296 
297   public String getPartitionName()
298   {
299     return _record.getSimpleField(Attributes.PARTITION_NAME.toString());
300   }
301 
302   public String getStateModelDef()
303   {
304     return _record.getSimpleField(Attributes.STATE_MODEL_DEF.toString());
305   }
306 
307   public void setStateModelDef(String stateModelDefName)
308   {
309     _record.setSimpleField(Attributes.STATE_MODEL_DEF.toString(), stateModelDefName);
310   }
311 
312   public void setReadTimeStamp(long time)
313   {
314     _record.setSimpleField(Attributes.READ_TIMESTAMP.toString(), "" + time);
315   }
316 
317   public void setExecuteStartTimeStamp(long time)
318   {
319     _record.setSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString(), "" + time);
320   }
321 
322   public long getReadTimeStamp()
323   {
324     String timestamp = _record.getSimpleField(Attributes.READ_TIMESTAMP.toString());
325     if (timestamp == null)
326     {
327       return 0;
328     }
329     try
330     {
331       return Long.parseLong(timestamp);
332     }
333     catch (Exception e)
334     {
335       return 0;
336     }
337 
338   }
339 
340   public long getExecuteStartTimeStamp()
341   {
342     String timestamp =
343         _record.getSimpleField(Attributes.EXECUTE_START_TIMESTAMP.toString());
344     if (timestamp == null)
345     {
346       return 0;
347     }
348     try
349     {
350       return Long.parseLong(timestamp);
351     }
352     catch (Exception e)
353     {
354       return 0;
355     }
356   }
357 
358   public long getCreateTimeStamp()
359   {
360     if (_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()) == null)
361     {
362       return 0;
363     }
364     try
365     {
366       return Long.parseLong(_record.getSimpleField(Attributes.CREATE_TIMESTAMP.toString()));
367     }
368     catch (Exception e)
369     {
370       return 0;
371     }
372   }
373 
374   public void setCorrelationId(String correlationId)
375   {
376     _record.setSimpleField(Attributes.CORRELATION_ID.toString(), correlationId);
377   }
378 
379   public String getCorrelationId()
380   {
381     return _record.getSimpleField(Attributes.CORRELATION_ID.toString());
382   }
383 
384   public int getExecutionTimeout()
385   {
386     if (!_record.getSimpleFields().containsKey(Attributes.TIMEOUT.toString()))
387     {
388       return -1;
389     }
390     try
391     {
392       return Integer.parseInt(_record.getSimpleField(Attributes.TIMEOUT.toString()));
393     }
394     catch (Exception e)
395     {
396     }
397     return -1;
398   }
399 
400   public void setExecutionTimeout(int timeout)
401   {
402     _record.setSimpleField(Attributes.TIMEOUT.toString(), "" + timeout);
403   }
404 
405   public void setRetryCount(int retryCount)
406   {
407     _record.setSimpleField(Attributes.RETRY_COUNT.toString(), "" + retryCount);
408   }
409 
410   public int getRetryCount()
411   {
412     try
413     {
414       return Integer.parseInt(_record.getSimpleField(Attributes.RETRY_COUNT.toString()));
415     }
416     catch (Exception e)
417     {
418     }
419     // Default to 0, and there is no retry if timeout happens
420     return 0;
421   }
422 
423   public Map<String, String> getResultMap()
424   {
425     return _record.getMapField(Attributes.MESSAGE_RESULT.toString());
426   }
427 
428   public void setResultMap(Map<String, String> resultMap)
429   {
430     _record.setMapField(Attributes.MESSAGE_RESULT.toString(), resultMap);
431   }
432 
433   public String getStateModelFactoryName()
434   {
435     return _record.getSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString());
436   }
437 
438   public void setStateModelFactoryName(String factoryName)
439   {
440     _record.setSimpleField(Attributes.STATE_MODEL_FACTORY_NAME.toString(), factoryName);
441   }
442 
443   // TODO: remove this. impl in HelixProperty
444   @Override
445   public int getBucketSize()
446   {
447     String bucketSizeStr = _record.getSimpleField(Attributes.BUCKET_SIZE.toString());
448     int bucketSize = 0;
449     if (bucketSizeStr != null)
450     {
451       try
452       {
453         bucketSize = Integer.parseInt(bucketSizeStr);
454       }
455       catch (NumberFormatException e)
456       {
457         // OK
458       }
459     }
460     return bucketSize;
461   }
462 
463   @Override
464   public void setBucketSize(int bucketSize)
465   {
466     if (bucketSize > 0)
467     {
468       _record.setSimpleField(Attributes.BUCKET_SIZE.toString(), "" + bucketSize);
469     }
470   }
471 
472   public void setAttribute(Attributes attr, String val)
473   {
474     _record.setSimpleField(attr.toString(), val);
475   }
476   
477   public String getAttribute(Attributes attr)
478   {
479     return _record.getSimpleField(attr.toString());
480   }
481   
482   public static Message createReplyMessage(Message srcMessage,
483                                            String instanceName,
484                                            Map<String, String> taskResultMap)
485   {
486     if (srcMessage.getCorrelationId() == null)
487     {
488       throw new HelixException("Message " + srcMessage.getMsgId()
489           + " does not contain correlation id");
490     }
491     Message replyMessage =
492         new Message(MessageType.TASK_REPLY, UUID.randomUUID().toString());
493     replyMessage.setCorrelationId(srcMessage.getCorrelationId());
494     replyMessage.setResultMap(taskResultMap);
495     replyMessage.setTgtSessionId("*");
496     replyMessage.setMsgState(MessageState.NEW);
497     replyMessage.setSrcName(instanceName);
498     if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER)
499     {
500       replyMessage.setTgtName("Controller");
501     }
502     else
503     {
504       replyMessage.setTgtName(srcMessage.getMsgSrc());
505     }
506     return replyMessage;
507   }
508 
509   public void addPartitionName(String partitionName)
510   {
511     if (_record.getListField(Attributes.PARTITION_NAME.toString()) == null)
512     {
513       _record.setListField(Attributes.PARTITION_NAME.toString(), new ArrayList<String>());
514     }
515     
516     List<String> partitionNames = _record.getListField(Attributes.PARTITION_NAME.toString());
517     if (!partitionNames.contains(partitionName))
518     {
519       partitionNames.add(partitionName);
520     }
521   }
522 
523   public List<String> getPartitionNames()
524   {
525     List<String> partitionNames =
526         _record.getListField(Attributes.PARTITION_NAME.toString());
527     if (partitionNames == null)
528     {
529       return Collections.emptyList();
530     }
531 
532     return partitionNames;
533   }
534 
535 //  public AtomicInteger getGroupMsgCountDown()
536 //  {
537 //    return _groupMsgCountDown;
538 //  }
539 //  
540 //  public void setGroupMsgCountDown(AtomicInteger countDown)
541 //  {
542 //    _groupMsgCountDown = countDown;
543 //  }
544   
545   public boolean isControlerMsg()
546   {
547     return getTgtName().equalsIgnoreCase("controller");
548   }
549   
550   public PropertyKey getKey(Builder keyBuilder, String instanceName)
551   {
552     if (isControlerMsg())
553     {
554       return keyBuilder.controllerMessage(getId());
555     }
556     else
557     {
558       return keyBuilder.message(instanceName, getId());
559     }
560   }
561   
562   private boolean isNullOrEmpty(String data)
563   {
564     return data == null || data.length() == 0 || data.trim().length() == 0;
565   }
566 
567   @Override
568   public boolean isValid()
569   {
570     // TODO: refactor message to state transition message and task-message and
571     // implement this function separately
572 
573     if (getMsgType().equals(MessageType.STATE_TRANSITION.toString()))
574     {
575       boolean isNotValid =
576           isNullOrEmpty(getTgtName()) || isNullOrEmpty(getPartitionName())
577               || isNullOrEmpty(getResourceName()) || isNullOrEmpty(getStateModelDef())
578               || isNullOrEmpty(getToState()) || isNullOrEmpty(getStateModelFactoryName())
579               || isNullOrEmpty(getFromState());
580 
581       return !isNotValid;
582     }
583 
584     return true;
585   }
586 }