View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.text.SimpleDateFormat;
23  import java.util.Arrays;
24  import java.util.Date;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  
29  import org.apache.helix.HelixDataAccessor;
30  import org.apache.helix.HelixManager;
31  import org.apache.helix.HelixProperty;
32  import org.apache.helix.PropertyType;
33  import org.apache.helix.ZNRecord;
34  import org.apache.helix.PropertyKey.Builder;
35  import org.apache.helix.alerts.AlertParser;
36  import org.apache.helix.alerts.AlertProcessor;
37  import org.apache.helix.alerts.AlertValueAndStatus;
38  import org.apache.helix.alerts.AlertsHolder;
39  import org.apache.helix.alerts.ExpressionParser;
40  import org.apache.helix.alerts.StatsHolder;
41  import org.apache.helix.alerts.Tuple;
42  import org.apache.helix.controller.pipeline.AbstractBaseStage;
43  import org.apache.helix.controller.pipeline.StageContext;
44  import org.apache.helix.controller.pipeline.StageException;
45  import org.apache.helix.healthcheck.StatHealthReportProvider;
46  import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
47  import org.apache.helix.model.AlertHistory;
48  import org.apache.helix.model.HealthStat;
49  import org.apache.helix.model.IdealState;
50  import org.apache.helix.model.LiveInstance;
51  import org.apache.helix.model.PersistentStats;
52  import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
53  import org.apache.log4j.Logger;
54  
55  
56  /**
57   * For each LiveInstances select currentState and message whose sessionId matches
58   * sessionId from LiveInstance Get Partition,State for all the resources computed in
59   * previous State [ResourceComputationStage]
60   *
61   */
62  public class StatsAggregationStage extends AbstractBaseStage
63  {
64  
65    public static final int ALERT_HISTORY_SIZE = 30;
66  
67    private static final Logger logger =
68        Logger.getLogger(StatsAggregationStage.class.getName());
69  
70    StatsHolder _statsHolder = null;
71    AlertsHolder _alertsHolder = null;
72    Map<String, Map<String, AlertValueAndStatus>> _alertStatus;
73    Map<String, Tuple<String>> _statStatus;
74    ClusterAlertMBeanCollection _alertBeanCollection = new ClusterAlertMBeanCollection();
75    Map<String, String> _alertActionTaken = new HashMap<String, String>();
76  
77    public final String PARTICIPANT_STAT_REPORT_NAME = StatHealthReportProvider.REPORT_NAME;
78    public final String ESPRESSO_STAT_REPORT_NAME = "RestQueryStats";
79    public final String REPORT_NAME = "AggStats";
80    // public final String DEFAULT_AGG_TYPE = "decay";
81    // public final String DEFAULT_DECAY_PARAM = "0.1";
82    // public final String DEFAULT_AGG_TYPE = "window";
83    // public final String DEFAULT_DECAY_PARAM = "5";
84  
85    public StatHealthReportProvider _aggStatsProvider;
86  
87    // public AggregationType _defaultAggType;
88  
89    public Map<String, Map<String, AlertValueAndStatus>> getAlertStatus()
90    {
91      return _alertStatus;
92    }
93  
94    public Map<String, Tuple<String>> getStatStatus()
95    {
96      return _statStatus;
97    }
98  
99    public void persistAggStats(HelixManager manager)
100   {
101     Map<String, String> report = _aggStatsProvider.getRecentHealthReport();
102     Map<String, Map<String, String>> partitionReport =
103         _aggStatsProvider.getRecentPartitionHealthReport();
104     ZNRecord record = new ZNRecord(_aggStatsProvider.getReportName());
105     if (report != null)
106     {
107       record.setSimpleFields(report);
108     }
109     if (partitionReport != null)
110     {
111       record.setMapFields(partitionReport);
112     }
113 
114 //    DataAccessor accessor = manager.getDataAccessor();
115     HelixDataAccessor accessor = manager.getHelixDataAccessor();
116 //    boolean retVal = accessor.setProperty(PropertyType.PERSISTENTSTATS, record);
117     Builder keyBuilder = accessor.keyBuilder();
118     boolean retVal = accessor.setProperty(keyBuilder.persistantStat(), new PersistentStats(record));
119     if (retVal == false)
120     {
121       logger.error("attempt to persist derived stats failed");
122     }
123   }
124 
125   @Override
126   public void init(StageContext context)
127   {
128   }
129 
130   public String getAgeStatName(String instance)
131   {
132     return instance + ExpressionParser.statFieldDelim + "reportingage";
133   }
134 
135   // currTime in seconds
136   public void reportAgeStat(LiveInstance instance, long modifiedTime, long currTime)
137   {
138     String statName = getAgeStatName(instance.getInstanceName());
139     long age = (currTime - modifiedTime) / 1000; // XXX: ensure this is in
140                                                  // seconds
141     Map<String, String> ageStatMap = new HashMap<String, String>();
142     ageStatMap.put(StatsHolder.TIMESTAMP_NAME, String.valueOf(currTime));
143     ageStatMap.put(StatsHolder.VALUE_NAME, String.valueOf(age));
144     // note that applyStat will only work if alert already added
145     _statsHolder.applyStat(statName, ageStatMap);
146   }
147 
148   @Override
149   public void process(ClusterEvent event) throws Exception
150   {
151     long startTime = System.currentTimeMillis();
152     // String aggTypeName =
153     // DEFAULT_AGG_TYPE+AggregationType.DELIM+DEFAULT_DECAY_PARAM;
154     // _defaultAggType = AggregationTypeFactory.getAggregationType(aggTypeName);
155 
156     HelixManager manager = event.getAttribute("helixmanager");
157     HealthDataCache cache = event.getAttribute("HealthDataCache");
158 
159     if (manager == null || cache == null)
160     {
161       throw new StageException("helixmanager|HealthDataCache attribute value is null");
162     }
163     if(_alertsHolder == null)
164     {
165       _statsHolder = new StatsHolder(manager, cache);
166       _alertsHolder = new AlertsHolder(manager, cache, _statsHolder);
167     }
168     else
169     {
170       _statsHolder.updateCache(cache);
171       _alertsHolder.updateCache(cache);
172     }
173     if (_statsHolder.getStatsList().size() == 0)
174     {
175       if(logger.isTraceEnabled()){
176         logger.trace("stat holder is empty");
177       }
178       return;
179     }
180 
181     // init agg stats from cache
182     // initAggStats(cache);
183 
184     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
185 
186     long currTime = System.currentTimeMillis();
187     // for each live node, read node's stats
188     long readInstancesStart = System.currentTimeMillis();
189     for (LiveInstance instance : liveInstances.values())
190     {
191       String instanceName = instance.getInstanceName();
192       logger.debug("instanceName: " + instanceName);
193       // XXX: now have map of HealthStats, so no need to traverse them...verify
194       // correctness
195       Map<String, HealthStat> stats;
196       stats = cache.getHealthStats(instanceName);
197       // find participants stats
198       long modTime = -1;
199       // TODO: get healthreport child node modified time and reportAgeStat based on that
200       boolean reportedAge = false;
201       for (HealthStat participantStat : stats.values())
202       {
203         if (participantStat != null && !reportedAge)
204         {
205           // generate and report stats for how old this node's report is
206           modTime = participantStat.getLastModifiedTimeStamp();
207           reportAgeStat(instance, modTime, currTime);
208           reportedAge = true;
209         }
210         // System.out.println(modTime);
211         // XXX: need to convert participantStat to a better format
212         // need to get instanceName in here
213 
214         if (participantStat != null)
215         {
216           // String timestamp = String.valueOf(instance.getModifiedTime()); WANT
217           // REPORT LEVEL TS
218           Map<String, Map<String, String>> statMap =
219               participantStat.getHealthFields(instanceName);
220           for (String key : statMap.keySet())
221           {
222             _statsHolder.applyStat(key, statMap.get(key));
223           }
224         }
225       }
226     }
227     // Call _statsHolder.persistStats() once per pipeline. This will
228     // write the updated persisted stats into zookeeper
229     _statsHolder.persistStats();
230     logger.info("Done processing stats: "
231         + (System.currentTimeMillis() - readInstancesStart));
232     // populate _statStatus
233     _statStatus = _statsHolder.getStatsMap();
234 
235     for (String statKey : _statStatus.keySet())
236     {
237       logger.debug("Stat key, value: " + statKey + ": " + _statStatus.get(statKey));
238     }
239 
240     long alertExecuteStartTime = System.currentTimeMillis();
241     // execute alerts, populate _alertStatus
242     _alertStatus =
243         AlertProcessor.executeAllAlerts(_alertsHolder.getAlertList(),
244                                         _statsHolder.getStatsList());
245     logger.info("done executing alerts: "
246         + (System.currentTimeMillis() - alertExecuteStartTime));
247     for (String originAlertName : _alertStatus.keySet())
248     {
249       _alertBeanCollection.setAlerts(originAlertName,
250                                      _alertStatus.get(originAlertName),
251                                      manager.getClusterName());
252     }
253 
254     executeAlertActions(manager);
255     // Write alert fire history to zookeeper
256     updateAlertHistory(manager);
257     long writeAlertStartTime = System.currentTimeMillis();
258     // write out alert status (to zk)
259     _alertsHolder.addAlertStatusSet(_alertStatus);
260     logger.info("done writing alerts: "
261         + (System.currentTimeMillis() - writeAlertStartTime));
262 
263     // TODO: access the 2 status variables from somewhere to populate graphs
264 
265     long logAlertStartTime = System.currentTimeMillis();
266     // logging alert status
267     for (String alertOuterKey : _alertStatus.keySet())
268     {
269       logger.debug("Alert Outer Key: " + alertOuterKey);
270       Map<String, AlertValueAndStatus> alertInnerMap = _alertStatus.get(alertOuterKey);
271       if (alertInnerMap == null)
272       {
273         logger.debug(alertOuterKey + " has no alerts to report.");
274         continue;
275       }
276       for (String alertInnerKey : alertInnerMap.keySet())
277       {
278         logger.debug("  " + alertInnerKey + " value: "
279             + alertInnerMap.get(alertInnerKey).getValue() + ", status: "
280             + alertInnerMap.get(alertInnerKey).isFired());
281       }
282     }
283 
284     logger.info("done logging alerts: "
285         + (System.currentTimeMillis() - logAlertStartTime));
286 
287     long processLatency = System.currentTimeMillis() - startTime;
288     addLatencyToMonitor(event, processLatency);
289     logger.info("process end: " + processLatency);
290   }
291 
292   /**
293    * Go through the _alertStatus, and call executeAlertAction for those actual alerts that
294    * has been fired
295    */
296 
297   void executeAlertActions( HelixManager manager)
298   {
299     _alertActionTaken.clear();
300     // Go through the original alert strings
301     for(String originAlertName : _alertStatus.keySet())
302     {
303       Map<String, String> alertFields = _alertsHolder.getAlertsMap().get(originAlertName);
304       if(alertFields != null && alertFields.containsKey(AlertParser.ACTION_NAME))
305       {
306         String actionValue = alertFields.get(AlertParser.ACTION_NAME);
307         Map<String, AlertValueAndStatus> alertResultMap = _alertStatus.get(originAlertName);
308         if(alertResultMap == null)
309         {
310           logger.info("Alert "+ originAlertName + " does not have alert status map");
311           continue;
312         }
313         // For each original alert, iterate all actual alerts that it expands into
314         for(String actualStatName : alertResultMap.keySet())
315         {
316           // if the actual alert is fired, execute the action
317           if(alertResultMap.get(actualStatName).isFired())
318           {
319             logger.warn("Alert " + originAlertName + " action " + actionValue + " is triggered by " + actualStatName);
320             _alertActionTaken.put(actualStatName, actionValue);
321             // move functionalities into a seperate class
322             executeAlertAction(actualStatName, actionValue, manager);
323           }
324         }
325       }
326     }
327   }
328   /**
329    * Execute the action if an alert is fired, and the alert has an action associated with it.
330    * NOTE: consider unify this with DefaultParticipantErrorMessageHandler.handleMessage()
331    */
332   void executeAlertAction(String actualStatName, String actionValue, HelixManager manager)
333   {
334     if(actionValue.equals(ActionOnError.DISABLE_INSTANCE.toString()))
335     {
336       String instanceName = parseInstanceName(actualStatName, manager);
337       if(instanceName != null)
338       {
339         logger.info("Disabling instance " + instanceName);
340         manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, false);
341       }
342     }
343     else if(actionValue.equals(ActionOnError.DISABLE_PARTITION.toString()))
344     {
345       String instanceName = parseInstanceName(actualStatName, manager);
346       String resourceName = parseResourceName(actualStatName, manager);
347       String partitionName = parsePartitionName(actualStatName, manager);
348       if(instanceName != null && resourceName != null && partitionName != null)
349       {
350         logger.info("Disabling partition " + partitionName + " instanceName " +  instanceName);
351         manager.getClusterManagmentTool().enablePartition(false, manager.getClusterName(), instanceName,
352             resourceName, Arrays.asList(partitionName));
353       }
354     }
355     else if(actionValue.equals(ActionOnError.DISABLE_RESOURCE.toString()))
356     {
357       String instanceName = parseInstanceName(actualStatName, manager);
358       String resourceName = parseResourceName(actualStatName, manager);
359       logger.info("Disabling resource " + resourceName + " instanceName " +  instanceName + " not implemented");
360 
361     }
362   }
363 
364   public static String parseResourceName(String actualStatName, HelixManager manager)
365   {
366     HelixDataAccessor accessor = manager.getHelixDataAccessor();
367     Builder kb = accessor.keyBuilder();
368     List<IdealState> idealStates = accessor.getChildValues(kb.idealStates());
369     for (IdealState idealState : idealStates)
370     {
371       String resourceName = idealState.getResourceName();
372       if(actualStatName.contains("=" + resourceName + ".") || actualStatName.contains("=" + resourceName + ";"))
373       {
374         return resourceName;
375       }
376     }
377     return null;
378   }
379 
380   public static String parsePartitionName(String actualStatName, HelixManager manager)
381   {
382     String resourceName = parseResourceName(actualStatName, manager);
383     if(resourceName != null)
384     {
385       String partitionKey = "=" + resourceName + "_";
386       if(actualStatName.contains(partitionKey))
387       {
388         int pos = actualStatName.indexOf(partitionKey);
389         int nextDotPos = actualStatName.indexOf('.', pos + partitionKey.length());
390         int nextCommaPos = actualStatName.indexOf(';', pos + partitionKey.length());
391         if(nextCommaPos > 0 && nextCommaPos < nextDotPos)
392         {
393           nextDotPos = nextCommaPos;
394         }
395 
396         String partitionName = actualStatName.substring(pos + 1, nextDotPos);
397         return partitionName;
398       }
399     }
400     return null;
401   }
402 
403   public static String parseInstanceName(String actualStatName, HelixManager manager)
404   {
405     HelixDataAccessor accessor = manager.getHelixDataAccessor();
406     Builder kb = accessor.keyBuilder();
407     List<LiveInstance> liveInstances = accessor.getChildValues(kb.liveInstances());
408     for (LiveInstance instance : liveInstances)
409     {
410       String instanceName = instance.getInstanceName();
411       if(actualStatName.startsWith(instanceName))
412       {
413         return instanceName;
414       }
415     }
416     return null;
417   }
418 
419   void updateAlertHistory(HelixManager manager)
420   {
421    // Write alert fire history to zookeeper
422     _alertBeanCollection.refreshAlertDelta(manager.getClusterName());
423     Map<String, String> delta = _alertBeanCollection.getRecentAlertDelta();
424     // Update history only when some beans has changed
425     if(delta.size() > 0)
426     {
427       delta.putAll(_alertActionTaken);
428       SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh:mm:ss:SSS");
429       String date = dateFormat.format(new Date());
430 
431       HelixDataAccessor accessor = manager.getHelixDataAccessor();
432       Builder keyBuilder = accessor.keyBuilder();
433 
434       HelixProperty property = accessor.getProperty(keyBuilder.alertHistory());
435       ZNRecord alertFiredHistory;
436       if(property == null)
437       {
438         alertFiredHistory = new ZNRecord(PropertyType.ALERT_HISTORY.toString());
439       }
440       else
441       {
442         alertFiredHistory = property.getRecord();
443       }
444       while(alertFiredHistory.getMapFields().size() >= ALERT_HISTORY_SIZE)
445       {
446         // ZNRecord uses TreeMap which is sorted ascending internally
447         String firstKey = (String)(alertFiredHistory.getMapFields().keySet().toArray()[0]);
448         alertFiredHistory.getMapFields().remove(firstKey);
449       }
450       alertFiredHistory.setMapField(date, delta);
451 //      manager.getDataAccessor().setProperty(PropertyType.ALERT_HISTORY, alertFiredHistory);
452       accessor.setProperty(keyBuilder.alertHistory(), new AlertHistory(alertFiredHistory));
453       _alertBeanCollection.setAlertHistory(alertFiredHistory);
454     }
455   }
456 
457   public ClusterAlertMBeanCollection getClusterAlertMBeanCollection()
458   {
459     return _alertBeanCollection;
460   }
461 }