1 package org.apache.helix.util;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.text.DateFormat;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.TreeMap;
33 import java.util.UUID;
34 import java.util.concurrent.ConcurrentHashMap;
35
36 import org.apache.helix.HelixDataAccessor;
37 import org.apache.helix.HelixProperty;
38 import org.apache.helix.PropertyKey;
39 import org.apache.helix.ZNRecord;
40 import org.apache.helix.PropertyKey.Builder;
41 import org.apache.helix.model.Error;
42 import org.apache.helix.model.Message;
43 import org.apache.helix.model.StatusUpdate;
44 import org.apache.helix.model.Message.MessageType;
45 import org.apache.log4j.Logger;
46
47
48
49
50
51
52
53
54
55 public class StatusUpdateUtil
56 {
57 static Logger _logger = Logger.getLogger(StatusUpdateUtil.class);
58
59 public static class Transition implements Comparable<Transition>
60 {
61 private final String _msgID;
62 private final long _timeStamp;
63 private final String _from;
64 private final String _to;
65
66 public Transition(String msgID, long timeStamp, String from, String to)
67 {
68 this._msgID = msgID;
69 this._timeStamp = timeStamp;
70 this._from = from;
71 this._to = to;
72 }
73
74 @Override
75 public int compareTo(Transition t)
76 {
77 if (_timeStamp < t._timeStamp)
78 return -1;
79 else if (_timeStamp > t._timeStamp)
80 return 1;
81 else
82 return 0;
83 }
84
85 public boolean equals(Transition t)
86 {
87 return (_timeStamp == t._timeStamp && _from.equals(t._from) && _to.equals(t._to));
88 }
89
90 public String getFromState()
91 {
92 return _from;
93 }
94
95 public String getToState()
96 {
97 return _to;
98 }
99
100 public String getMsgID()
101 {
102 return _msgID;
103 }
104
105 @Override
106 public String toString()
107 {
108 return _msgID + ":" + _timeStamp + ":" + _from + "->" + _to;
109 }
110 }
111
112 public static enum TaskStatus
113 {
114 UNKNOWN, NEW, SCHEDULED, INVOKING, COMPLETED, FAILED
115 }
116
117 public static class StatusUpdateContents
118 {
119 private final List<Transition> _transitions;
120 private final Map<String, TaskStatus> _taskMessages;
121
122 private StatusUpdateContents(List<Transition> transitions,
123 Map<String, TaskStatus> taskMessages)
124 {
125 this._transitions = transitions;
126 this._taskMessages = taskMessages;
127 }
128
129 public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
130 String instance,
131 String resourceGroup,
132 String partition)
133 {
134 return getStatusUpdateContents(accessor, instance, resourceGroup, null, partition);
135 }
136
137
138
139
140
141
142 public static StatusUpdateContents getStatusUpdateContents(HelixDataAccessor accessor,
143 String instance,
144 String resourceGroup,
145 String sessionID,
146 String partition)
147 {
148 Builder keyBuilder = accessor.keyBuilder();
149
150 List<ZNRecord> instances =
151 HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
152 List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>();
153 for (ZNRecord znRecord : instances)
154 {
155 String instanceName = znRecord.getId();
156 if (!instanceName.equals(instance))
157 {
158 continue;
159 }
160
161 List<String> sessions = accessor.getChildNames(keyBuilder.sessions(instanceName));
162 for (String session : sessions)
163 {
164 if (sessionID != null && !session.equals(sessionID))
165 {
166 continue;
167 }
168
169 List<String> resourceGroups =
170 accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
171 session));
172 for (String resourceGroupName : resourceGroups)
173 {
174 if (!resourceGroupName.equals(resourceGroup))
175 {
176 continue;
177 }
178
179 List<String> partitionStrings =
180 accessor.getChildNames(keyBuilder.stateTransitionStatus(instanceName,
181 session,
182 resourceGroupName));
183
184 for (String partitionString : partitionStrings)
185 {
186 ZNRecord partitionRecord =
187 accessor.getProperty(keyBuilder.stateTransitionStatus(instanceName,
188 session,
189 resourceGroupName,
190 partitionString))
191 .getRecord();
192 if (!partitionString.equals(partition))
193 {
194 continue;
195 }
196 partitionRecords.add(partitionRecord);
197 }
198 }
199 }
200 }
201
202 return new StatusUpdateContents(getSortedTransitions(partitionRecords),
203 getTaskMessages(partitionRecords));
204 }
205
206 public List<Transition> getTransitions()
207 {
208 return _transitions;
209 }
210
211 public Map<String, TaskStatus> getTaskMessages()
212 {
213 return _taskMessages;
214 }
215
216
217
218
219 private static List<Transition> getSortedTransitions(List<ZNRecord> partitionRecords)
220 {
221 List<Transition> transitions = new ArrayList<Transition>();
222 for (ZNRecord partition : partitionRecords)
223 {
224 Map<String, Map<String, String>> mapFields = partition.getMapFields();
225 for (String key : mapFields.keySet())
226 {
227 if (key.startsWith("MESSAGE"))
228 {
229 Map<String, String> m = mapFields.get(key);
230 long createTimeStamp = 0;
231 try
232 {
233 createTimeStamp = Long.parseLong(m.get("CREATE_TIMESTAMP"));
234 }
235 catch (Exception e)
236 {
237 }
238 transitions.add(new Transition(m.get("MSG_ID"),
239 createTimeStamp,
240 m.get("FROM_STATE"),
241 m.get("TO_STATE")));
242 }
243 }
244 }
245 Collections.sort(transitions);
246 return transitions;
247 }
248
249 private static Map<String, TaskStatus> getTaskMessages(List<ZNRecord> partitionRecords)
250 {
251 Map<String, TaskStatus> taskMessages = new HashMap<String, TaskStatus>();
252 for (ZNRecord partition : partitionRecords)
253 {
254 Map<String, Map<String, String>> mapFields = partition.getMapFields();
255
256
257 for (String key : mapFields.keySet())
258 {
259 if (key.contains("STATE_TRANSITION"))
260 {
261 Map<String, String> m = mapFields.get(key);
262 String id = m.get("MSG_ID");
263 String statusString = m.get("AdditionalInfo");
264 TaskStatus status = TaskStatus.UNKNOWN;
265 if (statusString.contains("scheduled"))
266 status = TaskStatus.SCHEDULED;
267 else if (statusString.contains("invoking"))
268 status = TaskStatus.INVOKING;
269 else if (statusString.contains("completed"))
270 status = TaskStatus.COMPLETED;
271
272 taskMessages.put(id, status);
273 }
274 }
275 }
276 return taskMessages;
277 }
278 }
279
280 public enum Level
281 {
282 HELIX_ERROR, HELIX_WARNING, HELIX_INFO
283 }
284
285
286
287
288
289
290 public ZNRecord createEmptyStatusUpdateRecord(String id)
291 {
292 return new ZNRecord(id);
293 }
294
295
296
297
298
299
300
301 ZNRecord createMessageLogRecord(Message message)
302 {
303 ZNRecord result = new ZNRecord(getStatusUpdateRecordName(message));
304 String mapFieldKey = "MESSAGE " + message.getMsgId();
305 result.setMapField(mapFieldKey, new TreeMap<String, String>());
306
307
308
309 for (String simpleFieldKey : message.getRecord().getSimpleFields().keySet())
310 {
311 result.getMapField(mapFieldKey).put(simpleFieldKey,
312 message.getRecord()
313 .getSimpleField(simpleFieldKey));
314 }
315 if (message.getResultMap() != null)
316 {
317 result.setMapField("MessageResult", message.getResultMap());
318 }
319 return result;
320 }
321
322 Map<String, String> _recordedMessages = new ConcurrentHashMap<String, String>();
323
324
325
326
327
328
329
330
331
332
333
334
335
336 public ZNRecord createMessageStatusUpdateRecord(Message message,
337 Level level,
338 Class classInfo,
339 String additionalInfo)
340 {
341 ZNRecord result = createEmptyStatusUpdateRecord(getStatusUpdateRecordName(message));
342 Map<String, String> contentMap = new TreeMap<String, String>();
343
344 contentMap.put("Message state", message.getMsgState().toString());
345 contentMap.put("AdditionalInfo", additionalInfo);
346 contentMap.put("Class", classInfo.toString());
347 contentMap.put("MSG_ID", message.getMsgId());
348
349 DateFormat formatter = new SimpleDateFormat("yyyyMMdd-HHmmss.SSSSSS");
350 String time = formatter.format(new Date());
351
352 String id =
353 String.format("%4s %26s ", level.toString(), time)
354 + getRecordIdForMessage(message);
355
356 result.setMapField(id, contentMap);
357
358 return result;
359 }
360
361 String getRecordIdForMessage(Message message)
362 {
363 if (message.getMsgType().equals(MessageType.STATE_TRANSITION))
364 {
365 return message.getPartitionName() + " Trans:" + message.getFromState().charAt(0)
366 + "->" + message.getToState().charAt(0) + " " + UUID.randomUUID().toString();
367 }
368 else
369 {
370 return message.getMsgType() + " " + UUID.randomUUID().toString();
371 }
372 }
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389 public void logMessageStatusUpdateRecord(Message message,
390 Level level,
391 Class classInfo,
392 String additionalInfo,
393 HelixDataAccessor accessor)
394 {
395 try
396 {
397 ZNRecord record =
398 createMessageStatusUpdateRecord(message, level, classInfo, additionalInfo);
399 publishStatusUpdateRecord(record, message, level, accessor);
400 }
401 catch (Exception e)
402 {
403 _logger.error("Exception while logging status update", e);
404 }
405 }
406
407 public void logError(Message message,
408 Class classInfo,
409 String additionalInfo,
410 HelixDataAccessor accessor)
411 {
412 logMessageStatusUpdateRecord(message,
413 Level.HELIX_ERROR,
414 classInfo,
415 additionalInfo,
416 accessor);
417 }
418
419 public void logError(Message message,
420 Class classInfo,
421 Exception e,
422 String additionalInfo,
423 HelixDataAccessor accessor)
424 {
425 StringWriter sw = new StringWriter();
426 PrintWriter pw = new PrintWriter(sw);
427 e.printStackTrace(pw);
428 logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, additionalInfo
429 + sw.toString(), accessor);
430 }
431
432 public void logInfo(Message message,
433 Class classInfo,
434 String additionalInfo,
435 HelixDataAccessor accessor)
436 {
437 logMessageStatusUpdateRecord(message,
438 Level.HELIX_INFO,
439 classInfo,
440 additionalInfo,
441 accessor);
442 }
443
444 public void logWarning(Message message,
445 Class classInfo,
446 String additionalInfo,
447 HelixDataAccessor accessor)
448 {
449 logMessageStatusUpdateRecord(message,
450 Level.HELIX_WARNING,
451 classInfo,
452 additionalInfo,
453 accessor);
454 }
455
456
457
458
459
460
461
462
463
464
465
466
467
468 void publishStatusUpdateRecord(ZNRecord record,
469 Message message,
470 Level level,
471 HelixDataAccessor accessor)
472 {
473 String instanceName = message.getTgtName();
474 String statusUpdateSubPath = getStatusUpdateSubPath(message);
475 String statusUpdateKey = getStatusUpdateKey(message);
476 String sessionId = message.getExecutionSessionId();
477 if (sessionId == null)
478 {
479 sessionId = message.getTgtSessionId();
480 }
481 if (sessionId == null)
482 {
483 sessionId = "*";
484 }
485
486 Builder keyBuilder = accessor.keyBuilder();
487 if (!_recordedMessages.containsKey(message.getMsgId()))
488 {
489
490 if (instanceName.equalsIgnoreCase("Controller"))
491 {
492 accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
493 statusUpdateKey),
494 new StatusUpdate(createMessageLogRecord(message)));
495
496 }
497 else
498 {
499
500 PropertyKey propertyKey =
501 keyBuilder.stateTransitionStatus(instanceName,
502 sessionId,
503 statusUpdateSubPath,
504 statusUpdateKey);
505
506 ZNRecord statusUpdateRecord = createMessageLogRecord(message);
507
508
509
510 if(_logger.isTraceEnabled()){
511 _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:"
512 + statusUpdateRecord);
513 }
514 accessor.updateProperty(propertyKey, new StatusUpdate(statusUpdateRecord));
515
516 }
517 _recordedMessages.put(message.getMsgId(), message.getMsgId());
518 }
519
520 if (instanceName.equalsIgnoreCase("Controller"))
521 {
522 accessor.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath,
523 statusUpdateKey),
524 new StatusUpdate(record));
525 }
526 else
527 {
528
529 PropertyKey propertyKey =
530 keyBuilder.stateTransitionStatus(instanceName,
531 sessionId,
532 statusUpdateSubPath,
533 statusUpdateKey);
534
535
536 if(_logger.isTraceEnabled()){
537 _logger.trace("StatusUpdate path:" + propertyKey.getPath() + ", updates:" + record);
538 }
539 accessor.updateProperty(propertyKey, new StatusUpdate(record));
540 }
541
542
543 if (Level.HELIX_ERROR == level)
544 {
545 publishErrorRecord(record, message, accessor);
546 }
547 }
548
549 private String getStatusUpdateKey(Message message)
550 {
551 if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
552 {
553 return message.getPartitionName();
554 }
555 return message.getMsgId();
556 }
557
558
559
560
561
562 String getStatusUpdateSubPath(Message message)
563 {
564 if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
565 {
566 return message.getResourceName();
567 }
568 else
569 {
570 return message.getMsgType();
571 }
572 }
573
574 String getStatusUpdateRecordName(Message message)
575 {
576 if (message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.toString()))
577 {
578 return message.getTgtSessionId() + "__" + message.getResourceName();
579 }
580 return message.getMsgId();
581 }
582
583
584
585
586
587
588
589
590
591
592
593 void publishErrorRecord(ZNRecord record, Message message, HelixDataAccessor accessor)
594 {
595 String instanceName = message.getTgtName();
596 String statusUpdateSubPath = getStatusUpdateSubPath(message);
597 String statusUpdateKey = getStatusUpdateKey(message);
598 String sessionId = message.getExecutionSessionId();
599 if (sessionId == null)
600 {
601 sessionId = message.getTgtSessionId();
602 }
603 if (sessionId == null)
604 {
605 sessionId = "*";
606 }
607
608 Builder keyBuilder = accessor.keyBuilder();
609
610
611 if (instanceName.equalsIgnoreCase("controller"))
612 {
613
614
615
616
617 accessor.setProperty(keyBuilder.controllerTaskError(statusUpdateSubPath),
618 new Error(record));
619 }
620 else
621 {
622
623
624
625
626
627
628 accessor.updateProperty(keyBuilder.stateTransitionError(instanceName,
629 sessionId,
630 statusUpdateSubPath,
631 statusUpdateKey),
632 new Error(record));
633
634 }
635 }
636 }