1 package org.apache.helix.alerts;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import org.apache.helix.HelixDataAccessor;
26 import org.apache.helix.ZNRecord;
27 import org.apache.helix.Mocks.MockManager;
28 import org.apache.helix.PropertyKey.Builder;
29 import org.apache.helix.alerts.StatsHolder;
30 import org.apache.helix.alerts.Tuple;
31 import org.apache.helix.controller.stages.HealthDataCache;
32 import org.testng.AssertJUnit;
33 import org.testng.annotations.BeforeMethod;
34 import org.testng.annotations.Test;
35
36
37 public class TestArrivingParticipantStats
38 {
39 protected static final String CLUSTER_NAME = "TestCluster";
40
41 MockManager _helixManager;
42 StatsHolder _statsHolder;
43
44 @BeforeMethod(groups = { "unitTest" })
45 public void setup()
46 {
47 _helixManager = new MockManager(CLUSTER_NAME);
48 _statsHolder = new StatsHolder(_helixManager, new HealthDataCache());
49 }
50
51 public Map<String, String> getStatFields(String value, String timestamp)
52 {
53 Map<String, String> statMap = new HashMap<String, String>();
54 statMap.put(StatsHolder.VALUE_NAME, value);
55 statMap.put(StatsHolder.TIMESTAMP_NAME, timestamp);
56 return statMap;
57 }
58
59 public boolean statRecordContains(ZNRecord rec, String statName)
60 {
61 Map<String, Map<String, String>> stats = rec.getMapFields();
62 return stats.containsKey(statName);
63 }
64
65 public boolean statRecordHasValue(ZNRecord rec, String statName, String value)
66 {
67 Map<String, Map<String, String>> stats = rec.getMapFields();
68 Map<String, String> statFields = stats.get(statName);
69 return (statFields.get(StatsHolder.VALUE_NAME).equals(value));
70 }
71
72 public boolean statRecordHasTimestamp(ZNRecord rec, String statName, String timestamp)
73 {
74 Map<String, Map<String, String>> stats = rec.getMapFields();
75 Map<String, String> statFields = stats.get(statName);
76 return (statFields.get(StatsHolder.TIMESTAMP_NAME).equals(timestamp));
77 }
78
79
80 @Test(groups = { "unitTest" })
81 public void testAddFirstParticipantStat() throws Exception
82 {
83
84 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
85 _statsHolder.addStat(persistentStat);
86
87
88 String incomingStatName = "dbFoo.partition10.latency";
89 Map<String, String> statFields = getStatFields("0", "0");
90 _statsHolder.applyStat(incomingStatName, statFields);
91 _statsHolder.persistStats();
92
93
94 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
95 Builder keyBuilder = accessor.keyBuilder();
96
97 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
98
99 System.out.println("rec: " + rec.toString());
100 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
101 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
102 }
103
104
105 @Test(groups = { "unitTest" })
106 public void testAddRepeatParticipantStat() throws Exception
107 {
108
109 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
110 _statsHolder.addStat(persistentStat);
111
112
113 String incomingStatName = "dbFoo.partition10.latency";
114
115 Map<String, String> statFields = getStatFields("0", "0");
116 _statsHolder.applyStat(incomingStatName, statFields);
117 statFields = getStatFields("1", "10");
118 _statsHolder.applyStat(incomingStatName, statFields);
119 _statsHolder.persistStats();
120
121
122 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
123 Builder keyBuilder = accessor.keyBuilder();
124
125 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
126
127 System.out.println("rec: " + rec.toString());
128 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
129 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
130 }
131
132
133 @Test(groups = { "unitTest" })
134 public void testBackdatedParticipantStat() throws Exception
135 {
136
137 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
138 _statsHolder.addStat(persistentStat);
139
140
141 String incomingStatName = "dbFoo.partition10.latency";
142
143 Map<String, String> statFields = getStatFields("0", "0");
144 _statsHolder.applyStat(incomingStatName, statFields);
145 statFields = getStatFields("1", "10");
146 _statsHolder.applyStat(incomingStatName, statFields);
147 statFields = getStatFields("5", "15");
148 _statsHolder.applyStat(incomingStatName, statFields);
149 statFields = getStatFields("1", "10");
150 _statsHolder.applyStat(incomingStatName, statFields);
151 _statsHolder.persistStats();
152
153
154 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
155 Builder keyBuilder = accessor.keyBuilder();
156
157 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
158
159 System.out.println("rec: " + rec.toString());
160 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "6.0"));
161 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "15.0"));
162 }
163
164
165 @Test(groups = { "unitTest" })
166 public void testAddFirstParticipantStatToWildCard() throws Exception
167 {
168
169 String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
170 _statsHolder.addStat(persistentWildcardStat);
171
172
173 String incomingStatName = "dbFoo.partition10.latency";
174 Map<String, String> statFields = getStatFields("0", "0");
175 _statsHolder.applyStat(incomingStatName, statFields);
176 _statsHolder.persistStats();
177
178
179 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
180 Builder keyBuilder = accessor.keyBuilder();
181
182 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
183
184 System.out.println("rec: " + rec.toString());
185 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
186 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
187 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
188 }
189
190
191 @Test(groups = { "unitTest" })
192 public void testAddSecondParticipantStatToWildCard() throws Exception
193 {
194
195 String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
196 _statsHolder.addStat(persistentWildcardStat);
197
198
199 String incomingStatName = "dbFoo.partition10.latency";
200 Map<String, String> statFields = getStatFields("1", "0");
201 _statsHolder.applyStat(incomingStatName, statFields);
202 statFields = getStatFields("1", "10");
203 _statsHolder.applyStat(incomingStatName, statFields);
204 _statsHolder.persistStats();
205
206
207 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
208 Builder keyBuilder = accessor.keyBuilder();
209
210 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
211
212 System.out.println("rec: " + rec.toString());
213 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
214 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "2.0"));
215 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
216 }
217
218
219 @Test(groups = { "unitTest" })
220 public void testAddParticipantStatToDoubleWildCard() throws Exception
221 {
222
223 String persistentWildcardStat = "accumulate()(db*.partition*.latency)";
224 _statsHolder.addStat(persistentWildcardStat);
225
226
227 String incomingStatName = "dbFoo.partition10.latency";
228 Map<String, String> statFields = getStatFields("0", "0");
229 _statsHolder.applyStat(incomingStatName, statFields);
230 _statsHolder.persistStats();
231
232
233 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
234 Builder keyBuilder = accessor.keyBuilder();
235
236 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
237
238 System.out.println("rec: " + rec.toString());
239 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
240 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
241 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
242 }
243
244 @Test(groups = { "unitTest" })
245 public void testAddWildcardInFirstStatToken() throws Exception
246 {
247 String persistentWildcardStat = "accumulate()(instance*.reportingage)";
248 _statsHolder.addStat(persistentWildcardStat);
249
250
251 String incomingStatName = "instance10.reportingage";
252 Map<String, String> statFields = getStatFields("1", "10");
253 _statsHolder.applyStat(incomingStatName, statFields);
254 _statsHolder.persistStats();
255
256
257 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
258 Builder keyBuilder = accessor.keyBuilder();
259
260 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
261
262 System.out.println("rec: " + rec.toString());
263 String persistentStat = "accumulate()(instance10.reportingage)";
264 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
265 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
266
267 }
268
269
270 @Test(groups = { "unitTest" })
271 public void testAddTwoDistinctParticipantStatsToSameWildCard() throws Exception
272 {
273
274 String persistentWildcardStat = "accumulate()(dbFoo.partition*.latency)";
275 _statsHolder.addStat(persistentWildcardStat);
276
277
278 String incomingStatName = "dbFoo.partition10.latency";
279 Map<String, String> statFields = getStatFields("1", "10");
280 _statsHolder.applyStat(incomingStatName, statFields);
281 incomingStatName = "dbFoo.partition11.latency";
282 statFields = getStatFields("5", "10");
283 _statsHolder.applyStat(incomingStatName, statFields);
284 _statsHolder.persistStats();
285
286
287 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
288 Builder keyBuilder = accessor.keyBuilder();
289
290 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
291
292 System.out.println("rec: " + rec.toString());
293 String persistentStat = "accumulate()(dbFoo.partition10.latency)";
294 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "1.0"));
295 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
296 persistentStat = "accumulate()(dbFoo.partition11.latency)";
297 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "5.0"));
298 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "10.0"));
299 }
300
301
302 @Test(groups = { "unitTest" })
303 public void testWindowStat() throws Exception
304 {
305
306 String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
307 _statsHolder.addStat(persistentWildcardStat);
308
309
310 String incomingStatName = "dbFoo.partition10.latency";
311 Map<String, String> statFields = getStatFields("0", "0");
312 _statsHolder.applyStat(incomingStatName, statFields);
313 _statsHolder.persistStats();
314
315
316 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
317 Builder keyBuilder = accessor.keyBuilder();
318
319 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
320
321 System.out.println("rec: " + rec.toString());
322 String persistentStat = "window(3)(dbFoo.partition10.latency)";
323 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0"));
324 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0"));
325
326
327 statFields = getStatFields("10", "1");
328 _statsHolder.applyStat(incomingStatName, statFields);
329 _statsHolder.persistStats();
330
331 rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
332
333 System.out.println("rec: " + rec.toString());
334 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0"));
335 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0"));
336
337
338 statFields = getStatFields("20", "2");
339 _statsHolder.applyStat(incomingStatName, statFields);
340 _statsHolder.persistStats();
341
342 rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
343
344 System.out.println("rec: " + rec.toString());
345 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
346 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
347
348 }
349
350 @Test(groups = { "unitTest" })
351 public void testWindowStatExpiration() throws Exception
352 {
353 String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
354 String persistentStat = "window(3)(dbFoo.partition10.latency)";
355
356 testWindowStat();
357
358 String incomingStatName = "dbFoo.partition10.latency";
359 Map<String, String> statFields = getStatFields("30", "3");
360 _statsHolder.applyStat(incomingStatName, statFields);
361 _statsHolder.persistStats();
362
363 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
364 Builder keyBuilder = accessor.keyBuilder();
365
366 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
367
368 System.out.println("rec: " + rec.toString());
369 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "10.0,20.0,30.0"));
370 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "1.0,2.0,3.0"));
371 }
372
373 @Test(groups = { "unitTest" })
374 public void testWindowStatStale() throws Exception
375 {
376 String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
377 String persistentStat = "window(3)(dbFoo.partition10.latency)";
378
379 testWindowStat();
380
381 String incomingStatName = "dbFoo.partition10.latency";
382 Map<String, String> statFields = getStatFields("10", "1");
383 _statsHolder.applyStat(incomingStatName, statFields);
384 _statsHolder.persistStats();
385
386 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
387 Builder keyBuilder = accessor.keyBuilder();
388
389 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
390
391 System.out.println("rec: " + rec.toString());
392 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
393 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
394 }
395
396
397
398 @Test(groups = { "unitTest" })
399 public void testAddStatForTwoAggTypes() throws Exception
400 {
401
402 String persistentStatOne = "accumulate()(dbFoo.partition10.latency)";
403 String persistentStatTwo = "window(3)(dbFoo.partition10.latency)";
404 _statsHolder.addStat(persistentStatOne);
405 _statsHolder.persistStats();
406 _statsHolder.addStat(persistentStatTwo);
407 _statsHolder.persistStats();
408
409
410 String incomingStatName = "dbFoo.partition10.latency";
411 Map<String, String> statFields = getStatFields("0", "0");
412 _statsHolder.applyStat(incomingStatName, statFields);
413 _statsHolder.persistStats();
414
415
416 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
417 Builder keyBuilder = accessor.keyBuilder();
418
419 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
420
421 System.out.println("rec: " + rec.toString());
422 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStatOne, "0.0"));
423 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStatOne, "0.0"));
424 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStatTwo, "0.0"));
425 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStatTwo, "0.0"));
426 }
427
428
429 @Test(groups = { "unitTest" })
430 public void testMergeTwoWindowsYesMerge() throws Exception
431 {
432 String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
433 String persistentStat = "window(3)(dbFoo.partition10.latency)";
434 String incomingStatName = "dbFoo.partition10.latency";
435
436 testWindowStat();
437
438
439 Tuple<String> valTuple = new Tuple<String>();
440 Tuple<String> timeTuple = new Tuple<String>();
441 valTuple.add("30.0");
442 valTuple.add("40.0");
443 timeTuple.add("3.0");
444 timeTuple.add("4.0");
445 Map<String, String> statFields =
446 getStatFields(valTuple.toString(), timeTuple.toString());
447 _statsHolder.applyStat(incomingStatName, statFields);
448 _statsHolder.persistStats();
449
450
451 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
452 Builder keyBuilder = accessor.keyBuilder();
453
454 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
455 System.out.println("rec: " + rec.toString());
456 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "20.0,30.0,40.0"));
457 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "2.0,3.0,4.0"));
458 }
459
460
461 @Test(groups = { "unitTest" })
462 public void testMergeTwoWindowsNoMerge() throws Exception
463 {
464 String persistentWildcardStat = "window(3)(dbFoo.partition*.latency)";
465 String persistentStat = "window(3)(dbFoo.partition10.latency)";
466 String incomingStatName = "dbFoo.partition10.latency";
467
468 testWindowStat();
469
470
471 Tuple<String> valTuple = new Tuple<String>();
472 Tuple<String> timeTuple = new Tuple<String>();
473 valTuple.add("0.0");
474 valTuple.add("40.0");
475 timeTuple.add("0.0");
476 timeTuple.add("4.0");
477 Map<String, String> statFields =
478 getStatFields(valTuple.toString(), timeTuple.toString());
479 _statsHolder.applyStat(incomingStatName, statFields);
480 _statsHolder.persistStats();
481
482
483 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
484 Builder keyBuilder = accessor.keyBuilder();
485
486 ZNRecord rec = accessor.getProperty(keyBuilder.persistantStat()).getRecord();
487 System.out.println("rec: " + rec.toString());
488 AssertJUnit.assertTrue(statRecordHasValue(rec, persistentStat, "0.0,10.0,20.0"));
489 AssertJUnit.assertTrue(statRecordHasTimestamp(rec, persistentStat, "0.0,1.0,2.0"));
490 }
491 }