1 package org.apache.helix.alerts;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.regex.Pattern;
29
30 import org.apache.helix.HelixDataAccessor;
31 import org.apache.helix.HelixException;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.PropertyKey;
34 import org.apache.helix.PropertyType;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.PropertyKey.Builder;
37 import org.apache.helix.controller.stages.HealthDataCache;
38 import org.apache.helix.model.PersistentStats;
39 import org.apache.log4j.Logger;
40
41
42 public class StatsHolder
43 {
44 enum MatchResult {WILDCARDMATCH, EXACTMATCH, NOMATCH};
45
46 private static final Logger logger = Logger.getLogger(StatsHolder.class
47 .getName());
48
49 public static final String VALUE_NAME = "value";
50 public static final String TIMESTAMP_NAME = "TimeStamp";
51
52 HelixDataAccessor _accessor;
53 HealthDataCache _cache;
54
55 Map<String, Map<String, String>> _statMap;
56 Map<String, Map<String, MatchResult>> _statAlertMatchResult;
57
58 private Builder _keyBuilder;
59
60
61 public StatsHolder(HelixManager manager, HealthDataCache cache)
62 {
63 _accessor = manager.getHelixDataAccessor();
64 _cache = cache;
65 _keyBuilder = new PropertyKey.Builder(manager.getClusterName());
66 updateCache(_cache);
67 _statAlertMatchResult = new HashMap<String, Map<String, MatchResult>>();
68
69 }
70
71 public void refreshStats()
72 {
73 logger.info("Refreshing cached stats");
74 _cache.refresh(_accessor);
75 updateCache(_cache);
76 }
77
78 public void persistStats()
79 {
80
81
82 PersistentStats stats = _accessor.getProperty(_keyBuilder.persistantStat());
83 if (stats == null)
84 {
85 stats = new PersistentStats(PersistentStats.nodeName);
86
87
88 }
89 stats.getRecord().setMapFields(_statMap);
90 boolean retVal = _accessor.setProperty(_keyBuilder.persistantStat(),
91 stats);
92 }
93
94 public void getStatsFromCache(boolean refresh)
95 {
96 long refreshStartTime = System.currentTimeMillis();
97 if (refresh) {
98 _cache.refresh(_accessor);
99 }
100 PersistentStats persistentStatRecord = _cache.getPersistentStats();
101 if (persistentStatRecord != null) {
102 _statMap = persistentStatRecord.getMapFields();
103 }
104 else {
105 _statMap = new HashMap<String,Map<String,String>>();
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119 System.out.println("Refresh stats done: "+(System.currentTimeMillis() - refreshStartTime));
120 }
121
122 public Iterator<String> getAllStats()
123 {
124 return null;
125 }
126
127
128
129
130
131 public Map<String, String> mergeStats(String statName,
132 Map<String, String> existingStat, Map<String, String> incomingStat)
133 throws HelixException
134 {
135 if (existingStat == null)
136 {
137 throw new HelixException("existing stat for merge is null");
138 }
139 if (incomingStat == null)
140 {
141 throw new HelixException("incoming stat for merge is null");
142 }
143
144 String aggTypeStr = ExpressionParser.getAggregatorStr(statName);
145 String[] aggArgs = ExpressionParser.getAggregatorArgs(statName);
146 Aggregator agg = ExpressionParser.getAggregator(aggTypeStr);
147
148
149
150 String existingTime = existingStat.get(TIMESTAMP_NAME);
151 String existingVal = existingStat.get(VALUE_NAME);
152 String incomingTime = incomingStat.get(TIMESTAMP_NAME);
153 String incomingVal = incomingStat.get(VALUE_NAME);
154
155 Tuple<String> existingTimeTuple = (existingTime != null) ? Tuple
156 .fromString(existingTime) : null;
157 Tuple<String> existingValueTuple = (existingVal != null) ? Tuple
158 .fromString(existingVal) : null;
159 Tuple<String> incomingTimeTuple = (incomingTime != null) ? Tuple
160 .fromString(incomingTime) : null;
161 Tuple<String> incomingValueTuple = (incomingVal != null) ? Tuple
162 .fromString(incomingVal) : null;
163
164
165 agg.merge(existingValueTuple, incomingValueTuple, existingTimeTuple,
166 incomingTimeTuple, aggArgs);
167
168 Map<String, String> mergedMap = new HashMap<String, String>();
169 if (existingTimeTuple.size() == 0)
170 {
171 throw new HelixException("merged time tuple has size zero");
172 }
173 if (existingValueTuple.size() == 0)
174 {
175 throw new HelixException("merged value tuple has size zero");
176 }
177
178 mergedMap.put(TIMESTAMP_NAME, existingTimeTuple.toString());
179 mergedMap.put(VALUE_NAME, existingValueTuple.toString());
180 return mergedMap;
181 }
182
183
184
185
186
187
188
189
190
191 public void applyStat(String incomingStatName, Map<String, String> statFields)
192 {
193
194
195
196 Map<String, Map<String, String>> pendingAdds = new HashMap<String, Map<String, String>>();
197
198 if(!_statAlertMatchResult.containsKey(incomingStatName))
199 {
200 _statAlertMatchResult.put(incomingStatName, new HashMap<String, MatchResult>());
201 }
202 Map<String, MatchResult> resultMap = _statAlertMatchResult.get(incomingStatName);
203
204 for (String key : _statMap.keySet())
205 {
206 if(resultMap.containsKey(key))
207 {
208 MatchResult cachedMatchResult = resultMap.get(key);
209 if(cachedMatchResult == MatchResult.EXACTMATCH)
210 {
211 processExactMatch(key, statFields);
212 }
213 else if(cachedMatchResult == MatchResult.WILDCARDMATCH)
214 {
215 processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
216 }
217
218 continue;
219 }
220
221 if (ExpressionParser.isIncomingStatExactMatch(key, incomingStatName))
222 {
223 processExactMatch(key, statFields);
224 resultMap.put(key, MatchResult.EXACTMATCH);
225 }
226
227 else if (ExpressionParser.isIncomingStatWildcardMatch(key,
228 incomingStatName))
229 {
230 processWildcardMatch(incomingStatName, key,statFields, pendingAdds);
231 resultMap.put(key, MatchResult.WILDCARDMATCH);
232 }
233 else
234 {
235 resultMap.put(key, MatchResult.NOMATCH);
236 }
237 }
238 _statMap.putAll(pendingAdds);
239 }
240
241 void processExactMatch(String key, Map<String, String> statFields)
242 {
243 Map<String, String> mergedStat = mergeStats(key, _statMap.get(key),
244 statFields);
245
246 _statMap.put(key, mergedStat);
247 }
248
249 void processWildcardMatch(String incomingStatName, String key,
250 Map<String, String> statFields, Map<String, Map<String, String>> pendingAdds)
251 {
252
253
254
255
256
257 String statToAdd = ExpressionParser.getWildcardStatSubstitution(key,
258 incomingStatName);
259
260
261
262
263 if (!_statMap.containsKey(statToAdd)
264 && !pendingAdds.containsKey(statToAdd))
265 {
266
267 Map<String, String> mergedStat = mergeStats(statToAdd,
268 getEmptyStat(), statFields);
269
270
271 pendingAdds.put(statToAdd, mergedStat);
272 }
273 }
274
275
276
277 public void addStat(String exp) throws HelixException
278 {
279 refreshStats();
280
281 String[] parsedStats = ExpressionParser.getBaseStats(exp);
282
283 for (String stat : parsedStats)
284 {
285 if (_statMap.containsKey(stat))
286 {
287 logger.debug("Stat " + stat + " already exists; not adding");
288 continue;
289 }
290 _statMap.put(stat, getEmptyStat());
291 }
292 }
293
294 public static Map<String, Map<String, String>> parseStat(String exp)
295 throws HelixException
296 {
297 String[] parsedStats = ExpressionParser.getBaseStats(exp);
298 Map<String, Map<String, String>> statMap = new HashMap<String, Map<String, String>>();
299
300 for (String stat : parsedStats)
301 {
302 if (statMap.containsKey(stat))
303 {
304 logger.debug("Stat " + stat + " already exists; not adding");
305 continue;
306 }
307 statMap.put(stat, getEmptyStat());
308 }
309 return statMap;
310 }
311
312
313 public static Map<String, String> getEmptyStat()
314 {
315 Map<String, String> statFields = new HashMap<String, String>();
316 statFields.put(TIMESTAMP_NAME, "");
317 statFields.put(VALUE_NAME, "");
318 return statFields;
319 }
320
321 public List<Stat> getStatsList()
322 {
323 List<Stat> stats = new LinkedList<Stat>();
324 for (String stat : _statMap.keySet())
325 {
326 Map<String, String> statFields = _statMap.get(stat);
327 Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
328 Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
329 Stat s = new Stat(stat, valTup, timeTup);
330 stats.add(s);
331 }
332 return stats;
333 }
334
335 public Map<String, Tuple<String>> getStatsMap()
336 {
337
338 HashMap<String, Tuple<String>> stats = new HashMap<String, Tuple<String>>();
339 for (String stat : _statMap.keySet())
340 {
341 Map<String, String> statFields = _statMap.get(stat);
342 Tuple<String> valTup = Tuple.fromString(statFields.get(VALUE_NAME));
343 Tuple<String> timeTup = Tuple.fromString(statFields.get(TIMESTAMP_NAME));
344 stats.put(stat, valTup);
345 }
346 return stats;
347 }
348
349 public void updateCache(HealthDataCache cache)
350 {
351 _cache = cache;
352 PersistentStats persistentStatRecord = _cache.getPersistentStats();
353 if (persistentStatRecord != null)
354 {
355 _statMap = persistentStatRecord.getMapFields();
356 }
357 else
358 {
359 _statMap = new HashMap<String, Map<String, String>>();
360 }
361 }
362 }