1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.text.SimpleDateFormat;
23 import java.util.Arrays;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.helix.HelixDataAccessor;
30 import org.apache.helix.HelixManager;
31 import org.apache.helix.HelixProperty;
32 import org.apache.helix.PropertyType;
33 import org.apache.helix.ZNRecord;
34 import org.apache.helix.PropertyKey.Builder;
35 import org.apache.helix.alerts.AlertParser;
36 import org.apache.helix.alerts.AlertProcessor;
37 import org.apache.helix.alerts.AlertValueAndStatus;
38 import org.apache.helix.alerts.AlertsHolder;
39 import org.apache.helix.alerts.ExpressionParser;
40 import org.apache.helix.alerts.StatsHolder;
41 import org.apache.helix.alerts.Tuple;
42 import org.apache.helix.controller.pipeline.AbstractBaseStage;
43 import org.apache.helix.controller.pipeline.StageContext;
44 import org.apache.helix.controller.pipeline.StageException;
45 import org.apache.helix.healthcheck.StatHealthReportProvider;
46 import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
47 import org.apache.helix.model.AlertHistory;
48 import org.apache.helix.model.HealthStat;
49 import org.apache.helix.model.IdealState;
50 import org.apache.helix.model.LiveInstance;
51 import org.apache.helix.model.PersistentStats;
52 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
53 import org.apache.log4j.Logger;
54
55
56
57
58
59
60
61
62 public class StatsAggregationStage extends AbstractBaseStage
63 {
64
65 public static final int ALERT_HISTORY_SIZE = 30;
66
67 private static final Logger logger =
68 Logger.getLogger(StatsAggregationStage.class.getName());
69
70 StatsHolder _statsHolder = null;
71 AlertsHolder _alertsHolder = null;
72 Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
73 Map<String, Tuple<String>> _statStatus;
74 ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
75 Map<String, String> _alertActionTaken = new HashMap<String, String>();
76
77 public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
78 public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
79 public final String REPORT_NAME = "AggStats";
80
81
82
83
84
85 public StatHealthReportProvider _aggStatsProvider;
86
87
88
89 public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
90 {
91 return _alertStatus;
92 }
93
94 public Map<String, Tuple<String>> getStatStatus()
95 {
96 return _statStatus;
97 }
98
99 public void persistAggStats(HelixManager manager)
100 {
101 Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
102 Map<String, Map<String, String>> partitionReport =
103 _aggStatsProvider.getRecentPartitionHealthReport();
104 ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
105 if (report != null)
106 {
107 record.setSimpleFields(report);
108 }
109 if (partitionReport != null)
110 {
111 record.setMapFields(partitionReport);
112 }
113
114
115 HelixDataAccessor accessor = manager.getHelixDataAccessor();
116
117 Builder keyBuilder = accessor.keyBuilder();
118 boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
119 if (retVal == false)
120 {
121 logger.error("attempt to persist derived stats failed");
122 }
123 }
124
125 @Override
126 public void init(StageContext context)
127 {
128 }
129
130 public String getAgeStatName(String instance)
131 {
132 return instance + ExpressionParser.statFieldDelim + "reportingage";
133 }
134
135
136 public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
137 {
138 String statName = getAgeStatName(instance.getInstanceName());
139 long age = (currTime - modifiedTime) / 1000;
140
141 Map<String, String> ageStatMap = new HashMap<String, String>();
142 ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
143 ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
144
145 _statsHolder.applyStat(statName, ageStatMap);
146 }
147
148 @Override
149 public void process(ClusterEvent event) throws Exception
150 {
151 long startTime = System.currentTimeMillis();
152
153
154
155
156 HelixManager manager = event.getAttribute("helixmanager");
157 HealthDataCache cache = event.getAttribute("HealthDataCache");
158
159 if (manager == null || cache == null)
160 {
161 throw new StageException("helixmanager|HealthDataCache attribute value is null");
162 }
163 if(_alertsHolder == null)
164 {
165 _statsHolder = new StatsHolder(manager, cache);
166 _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
167 }
168 else
169 {
170 _statsHolder.updateCache(cache);
171 _alertsHolder.updateCache(cache);
172 }
173 if (_statsHolder.getStatsList().size() == 0)
174 {
175 if(logger.isTraceEnabled()){
176 logger.trace("stat holder is empty");
177 }
178 return;
179 }
180
181
182
183
184 Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
185
186 long currTime = System.currentTimeMillis();
187
188 long readInstancesStart = System.currentTimeMillis();
189 for (LiveInstance instance : liveInstances.values())
190 {
191 String instanceName = instance.getInstanceName();
192 logger.debug("instanceName: " + instanceName);
193
194
195 Map<String, HealthStat> stats;
196 stats = cache.getHealthStats(instanceName);
197
198 long modTime = -1;
199
200 boolean reportedAge = false;
201 for (HealthStat participantStat : stats.values())
202 {
203 if (participantStat != null && !reportedAge)
204 {
205
206 modTime = participantStat.getLastModifiedTimeStamp();
207 reportAgeStat(instance, modTime, currTime);
208 reportedAge = true;
209 }
210
211
212
213
214 if (participantStat != null)
215 {
216
217
218 Map<String, Map<String, String>> statMap =
219 participantStat.getHealthFields(instanceName);
220 for (String key : statMap.keySet())
221 {
222 _statsHolder.applyStat(key, statMap.get(key));
223 }
224 }
225 }
226 }
227
228
229 _statsHolder.persistStats();
230 logger.info("Done processing stats: "
231 + (System.currentTimeMillis() - readInstancesStart));
232
233 _statStatus = _statsHolder.getStatsMap();
234
235 for (String statKey : _statStatus.keySet())
236 {
237 logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
238 }
239
240 long alertExecuteStartTime = System.currentTimeMillis();
241
242 _alertStatus =
243 AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
244 _statsHolder.getStatsList());
245 logger.info("done executing alerts: "
246 + (System.currentTimeMillis() - alertExecuteStartTime));
247 for (String originAlertName : _alertStatus.keySet())
248 {
249 _alertBeanCollection.setAlerts(originAlertName,
250 _alertStatus.get(originAlertName),
251 manager.getClusterName());
252 }
253
254 executeAlertActions(manager);
255
256 updateAlertHistory(manager);
257 long writeAlertStartTime = System.currentTimeMillis();
258
259 _alertsHolder.addAlertStatusSet(_alertStatus);
260 logger.info("done writing alerts: "
261 + (System.currentTimeMillis() - writeAlertStartTime));
262
263
264
265 long logAlertStartTime = System.currentTimeMillis();
266
267 for (String alertOuterKey : _alertStatus.keySet())
268 {
269 logger.debug("Alert Outer Key: " + alertOuterKey);
270 Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
271 if (alertInnerMap == null)
272 {
273 logger.debug(alertOuterKey + " has no alerts to report.");
274 continue;
275 }
276 for (String alertInnerKey : alertInnerMap.keySet())
277 {
278 logger.debug(" " + alertInnerKey + " value: "
279 + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
280 + alertInnerMap.get(alertInnerKey).isFired());
281 }
282 }
283
284 logger.info("done logging alerts: "
285 + (System.currentTimeMillis() - logAlertStartTime));
286
287 long processLatency = System.currentTimeMillis() - startTime;
288 addLatencyToMonitor(event, processLatency);
289 logger.info("process end: " + processLatency);
290 }
291
292
293
294
295
296
297 void executeAlertActions( HelixManager manager)
298 {
299 _alertActionTaken.clear();
300
301 for(String originAlertName : _alertStatus.keySet())
302 {
303 Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
304 if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
305 {
306 String actionValue = alertFields.get(AlertParser.ACTION_NAME);
307 Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
308 if(alertResultMap == null)
309 {
310 logger.info("Alert "+ originAlertName + " does not have alert status map");
311 continue;
312 }
313
314 for(String actualStatName : alertResultMap.keySet())
315 {
316
317 if(alertResultMap.get(actualStatName).isFired())
318 {
319 logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
320 _alertActionTaken.put(actualStatName, actionValue);
321
322 executeAlertAction(actualStatName, actionValue, manager);
323 }
324 }
325 }
326 }
327 }
328
329
330
331
332 void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
333 {
334 if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
335 {
336 String instanceName = parseInstanceName(actualStatName, manager);
337 if(instanceName != null)
338 {
339 logger.info("Disabling instance " + instanceName);
340 manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
341 }
342 }
343 else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
344 {
345 String instanceName = parseInstanceName(actualStatName, manager);
346 String resourceName = parseResourceName(actualStatName, manager);
347 String partitionName = parsePartitionName(actualStatName, manager);
348 if(instanceName != null && resourceName != null && partitionName != null)
349 {
350 logger.info("Disabling partition " + partitionName + " instanceName " + instanceName);
351 manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
352 resourceName, Arrays.asList(partitionName));
353 }
354 }
355 else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
356 {
357 String instanceName = parseInstanceName(actualStatName, manager);
358 String resourceName = parseResourceName(actualStatName, manager);
359 logger.info("Disabling resource " + resourceName + " instanceName " + instanceName + " not implemented");
360
361 }
362 }
363
364 public static String parseResourceName(String actualStatName, HelixManager manager)
365 {
366 HelixDataAccessor accessor = manager.getHelixDataAccessor();
367 Builder kb = accessor.keyBuilder();
368 List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
369 for (IdealState idealState : idealStates)
370 {
371 String resourceName = idealState.getResourceName();
372 if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
373 {
374 return resourceName;
375 }
376 }
377 return null;
378 }
379
380 public static String parsePartitionName(String actualStatName, HelixManager manager)
381 {
382 String resourceName = parseResourceName(actualStatName, manager);
383 if(resourceName != null)
384 {
385 String partitionKey = "=" + resourceName + "_";
386 if(actualStatName.contains(partitionKey))
387 {
388 int pos = actualStatName.indexOf(partitionKey);
389 int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
390 int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
391 if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
392 {
393 nextDotPos = nextCommaPos;
394 }
395
396 String partitionName = actualStatName.substring(pos + 1, nextDotPos);
397 return partitionName;
398 }
399 }
400 return null;
401 }
402
403 public static String parseInstanceName(String actualStatName, HelixManager manager)
404 {
405 HelixDataAccessor accessor = manager.getHelixDataAccessor();
406 Builder kb = accessor.keyBuilder();
407 List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
408 for (LiveInstance instance : liveInstances)
409 {
410 String instanceName = instance.getInstanceName();
411 if(actualStatName.startsWith(instanceName))
412 {
413 return instanceName;
414 }
415 }
416 return null;
417 }
418
419 void updateAlertHistory(HelixManager manager)
420 {
421
422 _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
423 Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
424
425 if(delta.size() > 0)
426 {
427 delta.putAll(_alertActionTaken);
428 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
429 String date = dateFormat.format(new Date());
430
431 HelixDataAccessor accessor = manager.getHelixDataAccessor();
432 Builder keyBuilder = accessor.keyBuilder();
433
434 HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
435 ZNRecord alertFiredHistory;
436 if(property == null)
437 {
438 alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
439 }
440 else
441 {
442 alertFiredHistory = property.getRecord();
443 }
444 while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
445 {
446
447 String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
448 alertFiredHistory.getMapFields().remove(firstKey);
449 }
450 alertFiredHistory.setMapField(date, delta);
451
452 accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
453 _alertBeanCollection.setAlertHistory(alertFiredHistory);
454 }
455 }
456
457 public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
458 {
459 return _alertBeanCollection;
460 }
461 }