1 package org.apache.helix.alerts;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.helix.HelixException;
23
24
25 public class AccumulateAggregator extends Aggregator {
26
27
28 public AccumulateAggregator()
29 {
30 _numArgs = 0;
31 }
32
33 @Override
34 public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
35 Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
36
37 double currVal = 0;
38 double currTime = -1;
39 double newVal;
40 double newTime;
41 double mergedVal;
42 double mergedTime;
43
44 if (currValTup == null || newValTup == null || currTimeTup == null ||
45 newTimeTup == null) {
46 throw new HelixException("Tuples cannot be null");
47 }
48
49
50 if (currValTup.size() > 0 && currTimeTup.size() > 0) {
51 currVal = Double.parseDouble(currValTup.iterator().next());
52 currTime = Double.parseDouble(currTimeTup.iterator().next());
53 }
54 newVal = Double.parseDouble(newValTup.iterator().next());
55 newTime = Double.parseDouble(newTimeTup.iterator().next());
56
57 if (newTime > currTime) {
58 mergedVal = currVal+newVal;
59 mergedTime = newTime;
60 }
61 else {
62 mergedVal = currVal;
63 mergedTime = currTime;
64 }
65
66 currValTup.clear();
67 currValTup.add(Double.toString(mergedVal));
68 currTimeTup.clear();
69 currTimeTup.add(Double.toString(mergedTime));
70 }
71
72
73 }