View Javadoc

1   package org.apache.helix.util;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.PrintWriter;
23  import java.io.StringWriter;
24  import java.text.DateFormat;
25  import java.text.SimpleDateFormat;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.Date;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.TreeMap;
33  import java.util.UUID;
34  import java.util.concurrent.ConcurrentHashMap;
35  
36  import org.apache.helix.HelixDataAccessor;
37  import org.apache.helix.HelixProperty;
38  import org.apache.helix.PropertyKey;
39  import org.apache.helix.ZNRecord;
40  import org.apache.helix.PropertyKey.Builder;
41  import org.apache.helix.model.Error;
42  import org.apache.helix.model.Message;
43  import org.apache.helix.model.StatusUpdate;
44  import org.apache.helix.model.Message.MessageType;
45  import org.apache.log4j.Logger;
46  
47  
48  /**
49   * Util class to create statusUpdates ZK records and error ZK records. These message
50   * records are for diagnostics only, and they are stored on the "StatusUpdates" and
51   * "errors" ZNodes in the zookeeper instances.
52   * 
53   * 
54   * */
55  public class StatusUpdateUtil
56  {
57    static Logger _logger = Logger.getLogger(StatusUpdateUtil.class);
58  
59    public static class Transition implements Comparable<Transition>
60    {
61      private final String _msgID;
62      private final long   _timeStamp;
63      private final String _from;
64      private final String _to;
65  
66      public Transition(String msgID, long timeStamp, String from, String to)
67      {
68        this._msgID = msgID;
69        this._timeStamp = timeStamp;
70        this._from = from;
71        this._to = to;
72      }
73  
74      @Override
75      public int compareTo(Transition t)
76      {
77        if (_timeStamp < t._timeStamp)
78          return -1;
79        else if (_timeStamp > t._timeStamp)
80          return 1;
81        else
82          return 0;
83      }
84  
85      public boolean equals(Transition t)
86      {
87        return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
88      }
89  
90      public String getFromState()
91      {
92        return _from;
93      }
94  
95      public String getToState()
96      {
97        return _to;
98      }
99  
100     public String getMsgID()
101     {
102       return _msgID;
103     }
104 
105     @Override
106     public String toString()
107     {
108       return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
109     }
110   }
111 
112   public static enum TaskStatus
113   {
114     UNKNOWN, NEW, SCHEDULED, INVOKING, COMPLETED, FAILED
115   }
116 
117   public static class StatusUpdateContents
118   {
119     private final List<Transition>        _transitions;
120     private final Map<String, TaskStatus> _taskMessages;
121 
122     private StatusUpdateContents(List<Transition> transitions,
123                                  Map<String, TaskStatus> taskMessages)
124     {
125       this._transitions = transitions;
126       this._taskMessages = taskMessages;
127     }
128 
129     public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
130                                                                String instance,
131                                                                String resourceGroup,
132                                                                String partition)
133     {
134       return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
135     }
136 
137     // TODO: We should build a map and return the key instead of searching
138     // everytime
139     // for an (instance, resourceGroup, session, partition) tuple.
140     // But such a map is very similar to what exists in ZNRecord
141     // passing null for sessionID results in searching across all sessions
142     public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
143                                                                String instance,
144                                                                String resourceGroup,
145                                                                String sessionID,
146                                                                String partition)
147     {
148       Builder keyBuilder = accessor.keyBuilder();
149 
150       List<ZNRecord> instances =
151           HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
152       List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
153       for (ZNRecord znRecord : instances)
154       {
155         String instanceName = znRecord.getId();
156         if (!instanceName.equals(instance))
157         {
158           continue;
159         }
160 
161         List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
162         for (String session : sessions)
163         {
164           if (sessionID != null && !session.equals(sessionID))
165           {
166             continue;
167           }
168 
169           List<String> resourceGroups =
170               accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
171                                                                       session));
172           for (String resourceGroupName : resourceGroups)
173           {
174             if (!resourceGroupName.equals(resourceGroup))
175             {
176               continue;
177             }
178 
179             List<String> partitionStrings =
180                 accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
181                                                                         session,
182                                                                         resourceGroupName));
183 
184             for (String partitionString : partitionStrings)
185             {
186               ZNRecord partitionRecord =
187                   accessor.getProperty(keyBuilder.stateTransitionStatus(instanceName,
188                                                                         session,
189                                                                         resourceGroupName,
190                                                                         partitionString))
191                           .getRecord();
192               if (!partitionString.equals(partition))
193               {
194                 continue;
195               }
196               partitionRecords.add(partitionRecord);
197             }
198           }
199         }
200       }
201 
202       return new StatusUpdateContents(getSortedTransitions(partitionRecords),
203                                       getTaskMessages(partitionRecords));
204     }
205 
206     public List<Transition> getTransitions()
207     {
208       return _transitions;
209     }
210 
211     public Map<String, TaskStatus> getTaskMessages()
212     {
213       return _taskMessages;
214     }
215 
216     // input: List<ZNRecord> corresponding to (instance, database,
217     // partition) tuples across all sessions
218     // return list of transitions sorted from earliest to latest
219     private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords)
220     {
221       List<Transition> transitions = new ArrayList<Transition>();
222       for (ZNRecord partition : partitionRecords)
223       {
224         Map<String, Map<String, String>> mapFields = partition.getMapFields();
225         for (String key : mapFields.keySet())
226         {
227           if (key.startsWith("MESSAGE"))
228           {
229             Map<String, String> m = mapFields.get(key);
230             long createTimeStamp = 0;
231             try
232             {
233               createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
234             }
235             catch (Exception e)
236             {
237             }
238             transitions.add(new Transition(m.get("MSG_ID"),
239                                            createTimeStamp,
240                                            m.get("FROM_STATE"),
241                                            m.get("TO_STATE")));
242           }
243         }
244       }
245       Collections.sort(transitions);
246       return transitions;
247     }
248 
249     private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords)
250     {
251       Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
252       for (ZNRecord partition : partitionRecords)
253       {
254         Map<String, Map<String, String>> mapFields = partition.getMapFields();
255         // iterate over the task status updates in the order they occurred
256         // so that the last status can be recorded
257         for (String key : mapFields.keySet())
258         {
259           if (key.contains("STATE_TRANSITION"))
260           {
261             Map<String, String> m = mapFields.get(key);
262             String id = m.get("MSG_ID");
263             String statusString = m.get("AdditionalInfo");
264             TaskStatus status = TaskStatus.UNKNOWN;
265             if (statusString.contains("scheduled"))
266               status = TaskStatus.SCHEDULED;
267             else if (statusString.contains("invoking"))
268               status = TaskStatus.INVOKING;
269             else if (statusString.contains("completed"))
270               status = TaskStatus.COMPLETED;
271 
272             taskMessages.put(id, status);
273           }
274         }
275       }
276       return taskMessages;
277     }
278   }
279 
280   public enum Level
281   {
282     HELIX_ERROR, HELIX_WARNING, HELIX_INFO
283   }
284 
285   /**
286    * Creates an empty ZNRecord as the statusUpdate/error record
287    * 
288    * @param id
289    */
290   public ZNRecord createEmptyStatusUpdateRecord(String id)
291   {
292     return new ZNRecord(id);
293   }
294 
295   /**
296    * Create a ZNRecord for a message, which stores the content of the message (stored in
297    * simple fields) into the ZNRecord mapFields. In this way, the message update can be
298    * merged with the previous status update record in the zookeeper. See ZNRecord.merge()
299    * for more details.
300    * */
301   ZNRecord createMessageLogRecord(Message message)
302   {
303     ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
304     String mapFieldKey = "MESSAGE " + message.getMsgId();
305     result.setMapField(mapFieldKey, new TreeMap<String, String>());
306 
307     // Store all the simple fields of the message in the new ZNRecord's map
308     // field.
309     for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet())
310     {
311       result.getMapField(mapFieldKey).put(simpleFieldKey,
312                                           message.getRecord()
313                                                  .getSimpleField(simpleFieldKey));
314     }
315     if (message.getResultMap() != null)
316     {
317       result.setMapField("MessageResult", message.getResultMap());
318     }
319     return result;
320   }
321 
322   Map<String, String> _recordedMessages = new ConcurrentHashMap<String, String>();
323 
324   /**
325    * Create a statusupdate that is related to a cluster manager message.
326    * 
327    * @param message
328    *          the related cluster manager message
329    * @param level
330    *          the error level
331    * @param classInfo
332    *          class info about the class that reports the status update
333    * @param additional
334    *          info the additional debug information
335    */
336   public ZNRecord createMessageStatusUpdateRecord(Message message,
337                                                   Level level,
338                                                   Class classInfo,
339                                                   String additionalInfo)
340   {
341     ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
342     Map<String, String> contentMap = new TreeMap<String, String>();
343 
344     contentMap.put("Message state", message.getMsgState().toString());
345     contentMap.put("AdditionalInfo", additionalInfo);
346     contentMap.put("Class", classInfo.toString());
347     contentMap.put("MSG_ID", message.getMsgId());
348 
349     DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
350     String time = formatter.format(new Date());
351 
352     String id =
353         String.format("%4s %26s ", level.toString(), time)
354             + getRecordIdForMessage(message);
355 
356     result.setMapField(id, contentMap);
357 
358     return result;
359   }
360 
361   String getRecordIdForMessage(Message message)
362   {
363     if (message.getMsgType().equals(MessageType.STATE_TRANSITION))
364     {
365       return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0)
366           + "->" + message.getToState().charAt(0) + "  " + UUID.randomUUID().toString();
367     }
368     else
369     {
370       return message.getMsgType() + " " + UUID.randomUUID().toString();
371     }
372   }
373 
374   /**
375    * Create a statusupdate that is related to a cluster manager message, then record it to
376    * the zookeeper store.
377    * 
378    * @param message
379    *          the related cluster manager message
380    * @param level
381    *          the error level
382    * @param classInfo
383    *          class info about the class that reports the status update
384    * @param additional
385    *          info the additional debug information
386    * @param accessor
387    *          the zookeeper data accessor that writes the status update to zookeeper
388    */
389   public void logMessageStatusUpdateRecord(Message message,
390                                            Level level,
391                                            Class classInfo,
392                                            String additionalInfo,
393                                            HelixDataAccessor accessor)
394   {
395     try
396     {
397       ZNRecord record =
398           createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
399       publishStatusUpdateRecord(record, message, level, accessor);
400     }
401     catch (Exception e)
402     {
403       _logger.error("Exception while logging status update", e);
404     }
405   }
406 
407   public void logError(Message message,
408                        Class classInfo,
409                        String additionalInfo,
410                        HelixDataAccessor accessor)
411   {
412     logMessageStatusUpdateRecord(message,
413                                  Level.HELIX_ERROR,
414                                  classInfo,
415                                  additionalInfo,
416                                  accessor);
417   }
418 
419   public void logError(Message message,
420                        Class classInfo,
421                        Exception e,
422                        String additionalInfo,
423                        HelixDataAccessor accessor)
424   {
425     StringWriter sw = new StringWriter();
426     PrintWriter pw = new PrintWriter(sw);
427     e.printStackTrace(pw);
428     logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo
429         + sw.toString(), accessor);
430   }
431 
432   public void logInfo(Message message,
433                       Class classInfo,
434                       String additionalInfo,
435                       HelixDataAccessor accessor)
436   {
437     logMessageStatusUpdateRecord(message,
438                                  Level.HELIX_INFO,
439                                  classInfo,
440                                  additionalInfo,
441                                  accessor);
442   }
443 
444   public void logWarning(Message message,
445                          Class classInfo,
446                          String additionalInfo,
447                          HelixDataAccessor accessor)
448   {
449     logMessageStatusUpdateRecord(message,
450                                  Level.HELIX_WARNING,
451                                  classInfo,
452                                  additionalInfo,
453                                  accessor);
454   }
455 
456   /**
457    * Write a status update record to zookeeper to the zookeeper store.
458    * 
459    * @param record
460    *          the status update record
461    * @param message
462    *          the message to be logged
463    * @param level
464    *          the error level of the message update
465    * @param accessor
466    *          the zookeeper data accessor that writes the status update to zookeeper
467    */
468   void publishStatusUpdateRecord(ZNRecord record,
469                                  Message message,
470                                  Level level,
471                                  HelixDataAccessor accessor)
472   {
473     String instanceName = message.getTgtName();
474     String statusUpdateSubPath = getStatusUpdateSubPath(message);
475     String statusUpdateKey = getStatusUpdateKey(message);
476     String sessionId = message.getExecutionSessionId();
477     if (sessionId == null)
478     {
479       sessionId = message.getTgtSessionId();
480     }
481     if (sessionId == null)
482     {
483       sessionId = "*";
484     }
485 
486     Builder keyBuilder = accessor.keyBuilder();
487     if (!_recordedMessages.containsKey(message.getMsgId()))
488     {
489       // TODO instanceName of a controller might be any string
490       if (instanceName.equalsIgnoreCase("Controller"))
491       {
492         accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
493                                                                 statusUpdateKey),
494                                 new StatusUpdate(createMessageLogRecord(message)));
495 
496       }
497       else
498       {
499         
500         PropertyKey propertyKey =
501             keyBuilder.stateTransitionStatus(instanceName,
502                                              sessionId,
503                                              statusUpdateSubPath,
504                                              statusUpdateKey);
505 
506         ZNRecord statusUpdateRecord = createMessageLogRecord(message);
507 
508         // For now write participant StatusUpdates to log4j. 
509         // we are using restlet as another data channel to report to controller.
510         if(_logger.isTraceEnabled()){
511            _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
512               + statusUpdateRecord);
513         }
514         accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
515         
516       }
517       _recordedMessages.put(message.getMsgId(), message.getMsgId());
518     }
519 
520     if (instanceName.equalsIgnoreCase("Controller"))
521     {
522       accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
523                                                               statusUpdateKey),
524                               new StatusUpdate(record));
525     }
526     else
527     {
528       
529       PropertyKey propertyKey =
530           keyBuilder.stateTransitionStatus(instanceName,
531                                            sessionId,
532                                            statusUpdateSubPath,
533                                            statusUpdateKey);
534       // For now write participant StatusUpdates to log4j. 
535       // we are using restlet as another data channel to report to controller.
536       if(_logger.isTraceEnabled()){
537         _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
538       }
539       accessor.updateProperty(propertyKey, new StatusUpdate(record));
540     }
541 
542     // If the error level is ERROR, also write the record to "ERROR" ZNode
543     if (Level.HELIX_ERROR == level)
544     {
545       publishErrorRecord(record, message, accessor);
546     }
547   }
548 
549   private String getStatusUpdateKey(Message message)
550   {
551     if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
552     {
553       return message.getPartitionName();
554     }
555     return message.getMsgId();
556   }
557 
558   /**
559    * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
560    * 
561    */
562   String getStatusUpdateSubPath(Message message)
563   {
564     if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
565     {
566       return message.getResourceName();
567     }
568     else
569     {
570       return message.getMsgType();
571     }
572   }
573 
574   String getStatusUpdateRecordName(Message message)
575   {
576     if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
577     {
578       return message.getTgtSessionId() + "__" + message.getResourceName();
579     }
580     return message.getMsgId();
581   }
582 
583   /**
584    * Write an error record to zookeeper to the zookeeper store.
585    * 
586    * @param record
587    *          the status update record
588    * @param message
589    *          the message to be logged
590    * @param accessor
591    *          the zookeeper data accessor that writes the status update to zookeeper
592    */
593   void publishErrorRecord(ZNRecord record, Message message, HelixDataAccessor accessor)
594   {
595     String instanceName = message.getTgtName();
596     String statusUpdateSubPath = getStatusUpdateSubPath(message);
597     String statusUpdateKey = getStatusUpdateKey(message);
598     String sessionId = message.getExecutionSessionId();
599     if (sessionId == null)
600     {
601       sessionId = message.getTgtSessionId();
602     }
603     if (sessionId == null)
604     {
605       sessionId = "*";
606     }
607 
608     Builder keyBuilder = accessor.keyBuilder();
609 
610     // TODO remove the hard code: "controller"
611     if (instanceName.equalsIgnoreCase("controller"))
612     {
613       // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
614       // ../{sessionId}/{subPath}
615       // accessor.setProperty(PropertyType.ERRORS_CONTROLLER, record,
616       // statusUpdateSubPath);
617       accessor.setProperty(keyBuilder.controllerTaskError(statusUpdateSubPath),
618                            new Error(record));
619     }
620     else
621     {
622       // accessor.updateProperty(PropertyType.ERRORS,
623       // record,
624       // instanceName,
625       // sessionId,
626       // statusUpdateSubPath,
627       // statusUpdateKey);
628       accessor.updateProperty(keyBuilder.stateTransitionError(instanceName,
629                                                               sessionId,
630                                                               statusUpdateSubPath,
631                                                               statusUpdateKey),
632                               new Error(record));
633 
634     }
635   }
636 }