View Javadoc

1   package org.apache.helix.alerts;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.HashMap;
23  import java.util.Iterator;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.regex.Pattern;
29  
30  import org.apache.helix.HelixDataAccessor;
31  import org.apache.helix.HelixException;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.PropertyKey;
34  import org.apache.helix.PropertyType;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.PropertyKey.Builder;
37  import org.apache.helix.controller.stages.HealthDataCache;
38  import org.apache.helix.model.PersistentStats;
39  import org.apache.log4j.Logger;
40  
41  
42  public class StatsHolder
43  {
44    enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
45    
46    private static final Logger logger = Logger.getLogger(StatsHolder.class
47        .getName());
48  
49    public static final String VALUE_NAME = "value";
50    public static final String TIMESTAMP_NAME = "TimeStamp";
51  
52    HelixDataAccessor _accessor;
53    HealthDataCache _cache;
54  
55    Map<String, Map<String, String>> _statMap;
56    Map<String, Map<String, MatchResult>> _statAlertMatchResult;
57  
58    private Builder _keyBuilder;
59    // PersistentStats _persistentStats;
60  
61    public StatsHolder(HelixManager manager, HealthDataCache cache)
62    {
63      _accessor = manager.getHelixDataAccessor();
64      _cache = cache;
65      _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
66      updateCache(_cache);
67      _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
68      
69    }
70  
71    public void refreshStats()
72    {
73      logger.info("Refreshing cached stats");
74      _cache.refresh(_accessor);
75      updateCache(_cache);
76    }
77  
78    public void persistStats()
79    {
80      // XXX: Am I using _accessor too directly here?
81      // took around 35 ms from desktop to ESV4 machine
82      PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
83      if (stats == null)
84      {
85        stats = new PersistentStats(PersistentStats.nodeName); // TODO: fix naming of
86                                                           // this record, if it
87                                                           // matters
88      }
89      stats.getRecord().setMapFields(_statMap);
90      boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
91          stats);
92    }
93  
94    public void getStatsFromCache(boolean refresh)
95    {
96      long refreshStartTime = System.currentTimeMillis();
97      if (refresh) {
98        _cache.refresh(_accessor);
99      }
100     PersistentStats persistentStatRecord = _cache.getPersistentStats();
101     if (persistentStatRecord != null) {
102       _statMap = persistentStatRecord.getMapFields();
103     }
104     else {
105       _statMap = new HashMap<String,Map<String,String>>();
106     }
107     /*
108 		if (_cache.getPersistentStats() != null) {
109 
110 			_statMap = _cache.getPersistentStats();
111 		}
112      */
113     //TODO: confirm this a good place to init the _statMap when null
114     /*
115 		if (_statMap == null) {
116 			_statMap = new HashMap<String, Map<String, String>>();
117 		}
118      */
119     System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
120   }
121 
122   public Iterator<String> getAllStats()
123   {
124     return null;
125   }
126 
127   /*
128    * TODO: figure out pre-conditions here. I think not allowing anything to be
129    * null on input
130    */
131   public Map<String, String> mergeStats(String statName,
132       Map<String, String> existingStat, Map<String, String> incomingStat)
133       throws HelixException
134   {
135     if (existingStat == null)
136     {
137       throw new HelixException("existing stat for merge is null");
138     }
139     if (incomingStat == null)
140     {
141       throw new HelixException("incoming stat for merge is null");
142     }
143     // get agg type and arguments, then get agg object
144     String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
145     String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
146     Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
147     // XXX: some of below lines might fail with null exceptions
148 
149     // get timestamps, values out of zk maps
150     String existingTime = existingStat.get(TIMESTAMP_NAME);
151     String existingVal = existingStat.get(VALUE_NAME);
152     String incomingTime = incomingStat.get(TIMESTAMP_NAME);
153     String incomingVal = incomingStat.get(VALUE_NAME);
154     // parse values into tuples, if the values exist. else, tuples are null
155     Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
156         .fromString(existingTime) : null;
157     Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
158         .fromString(existingVal) : null;
159     Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
160         .fromString(incomingTime) : null;
161     Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
162         .fromString(incomingVal) : null;
163 
164     // dp merge
165     agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
166         incomingTimeTuple, aggArgs);
167     // put merged tuples back in map
168     Map<String, String> mergedMap = new HashMap<String, String>();
169     if (existingTimeTuple.size() == 0)
170     {
171       throw new HelixException("merged time tuple has size zero");
172     }
173     if (existingValueTuple.size() == 0)
174     {
175       throw new HelixException("merged value tuple has size zero");
176     }
177 
178     mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
179     mergedMap.put(VALUE_NAME, existingValueTuple.toString());
180     return mergedMap;
181   }
182 
183   /*
184    * Find all persisted stats this stat matches. Update those stats. An incoming
185    * stat can match multiple stats exactly (if that stat has multiple agg types)
186    * An incoming stat can match multiple wildcard stats
187    */
188 
189   // need to do a time check here!
190 
191   public void applyStat(String incomingStatName, Map<String, String> statFields)
192   {
193     // TODO: consider locking stats here
194     //refreshStats(); //will have refreshed by now during stage
195 
196     Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
197     
198     if(!_statAlertMatchResult.containsKey(incomingStatName))
199     {
200       _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
201     }
202     Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
203     // traverse through all persistent stats
204     for (String key : _statMap.keySet())
205     {
206       if(resultMap.containsKey(key))
207       {
208         MatchResult cachedMatchResult = resultMap.get(key);
209         if(cachedMatchResult == MatchResult.EXACTMATCH)
210         {
211           processExactMatch(key, statFields);
212         }
213         else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
214         {
215           processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
216         }
217         // don't care about NOMATCH
218         continue;
219       }
220       // exact match on stat and stat portion of persisted stat, just update
221       if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
222       {
223         processExactMatch(key, statFields);
224         resultMap.put(key, MatchResult.EXACTMATCH);
225       }
226       // wildcard match
227       else if (ExpressionParser.isIncomingStatWildcardMatch(key,
228           incomingStatName))
229       {
230         processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
231         resultMap.put(key, MatchResult.WILDCARDMATCH);
232       }
233       else
234       {
235         resultMap.put(key, MatchResult.NOMATCH);
236       }
237     }
238     _statMap.putAll(pendingAdds);
239   } 
240   
241   void processExactMatch(String key, Map<String, String> statFields)
242   {
243     Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
244         statFields);
245     // update in place, no problem with hash map
246     _statMap.put(key, mergedStat);
247   }
248   
249   void processWildcardMatch(String incomingStatName, String key, 
250       Map<String, String> statFields,  Map<String, Map<String, String>> pendingAdds)
251   {
252 
253     // make sure incoming stat doesn't already exist, either in previous
254     // round or this round
255     // form new key (incomingStatName with agg type from the wildcarded
256     // stat)
257     String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
258         incomingStatName);
259     // if the stat already existed in _statMap, we have/will apply it as an
260     // exact match
261     // if the stat was added this round to pendingAdds, no need to recreate
262     // (it would have same value)
263     if (!_statMap.containsKey(statToAdd)
264         && !pendingAdds.containsKey(statToAdd))
265     {
266       // add this stat to persisted stats
267       Map<String, String> mergedStat = mergeStats(statToAdd,
268           getEmptyStat(), statFields);
269       // add to pendingAdds so we don't mess up ongoing traversal of
270       // _statMap
271       pendingAdds.put(statToAdd, mergedStat);
272     }
273   }
274 
275   // add parsing of stat (or is that in expression holder?) at least add
276   // validate
277   public void addStat(String exp) throws HelixException
278   {
279     refreshStats(); // get current stats
280 
281     String[] parsedStats = ExpressionParser.getBaseStats(exp);
282 
283     for (String stat : parsedStats)
284     {
285       if (_statMap.containsKey(stat))
286       {
287         logger.debug("Stat " + stat + " already exists; not adding");
288         continue;
289       }
290       _statMap.put(stat, getEmptyStat()); // add new stat to map
291     }
292   }
293 
294   public static Map<String, Map<String, String>> parseStat(String exp)
295       throws HelixException
296   {
297     String[] parsedStats = ExpressionParser.getBaseStats(exp);
298     Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
299 
300     for (String stat : parsedStats)
301     {
302       if (statMap.containsKey(stat))
303       {
304         logger.debug("Stat " + stat + " already exists; not adding");
305         continue;
306       }
307       statMap.put(stat, getEmptyStat()); // add new stat to map
308     }
309     return statMap;
310   }
311 
312 
313   public static Map<String, String> getEmptyStat()
314   {
315     Map<String, String> statFields = new HashMap<String, String>();
316     statFields.put(TIMESTAMP_NAME, "");
317     statFields.put(VALUE_NAME, "");
318     return statFields;
319   }
320 
321   public List<Stat> getStatsList()
322   {
323     List<Stat> stats = new LinkedList<Stat>();
324     for (String stat : _statMap.keySet())
325     {
326       Map<String, String> statFields = _statMap.get(stat);
327       Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
328       Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
329       Stat s = new Stat(stat, valTup, timeTup);
330       stats.add(s);
331     }
332     return stats;
333   }
334 
335   public Map<String, Tuple<String>> getStatsMap()
336   {
337     //refreshStats(); //don't refresh, stage will have refreshed by this time
338     HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
339     for (String stat : _statMap.keySet())
340     {
341       Map<String, String> statFields = _statMap.get(stat);
342       Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
343       Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
344       stats.put(stat, valTup);
345     }
346     return stats;
347   }
348 
349   public void updateCache(HealthDataCache cache)
350   {
351     _cache = cache;
352     PersistentStats persistentStatRecord = _cache.getPersistentStats();
353     if (persistentStatRecord != null)
354     {
355       _statMap = persistentStatRecord.getMapFields();
356     }
357     else
358     {
359       _statMap = new HashMap<String, Map<String, String>>();
360     }
361   }
362 }