1 package org.apache.helix.alerts;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Iterator;
23
24 import org.apache.helix.HelixException;
25
26
27 public class WindowAggregator extends Aggregator {
28
29
30 int _windowSize;
31
32 public WindowAggregator(String windowSize)
33 {
34 _windowSize = Integer.parseInt(windowSize);
35 _numArgs = 1;
36 }
37
38 public WindowAggregator()
39 {
40 this("1");
41 }
42
43 @Override
44 public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
45 Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
46
47 _windowSize = Integer.parseInt(args[0]);
48
49
50 Tuple<String> mergedTimeTuple = new Tuple<String>();
51 Tuple<String> mergedValTuple = new Tuple<String>();
52
53 Iterator<String> currTimeIter = currTimeTup.iterator();
54 Iterator<String> currValIter = currValTup.iterator();
55 Iterator<String> newTimeIter = newTimeTup.iterator();
56 Iterator<String> newValIter = newValTup.iterator();
57 int currCtr = 0;
58
59 double currTime = -1;
60 double currVal;
61 while (currTimeIter.hasNext()) {
62 currTime = Double.parseDouble(currTimeIter.next());
63 currVal = Double.parseDouble(currValIter.next());
64 currCtr++;
65
66 if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) {
67 mergedTimeTuple.add(String.valueOf(currTime));
68 mergedValTuple.add(String.valueOf(currVal));
69 }
70 }
71
72 double newVal;
73 double newTime;
74 while (newTimeIter.hasNext()) {
75 newVal = Double.parseDouble(newValIter.next());
76 newTime = Double.parseDouble(newTimeIter.next());
77 if (newTime <= currTime) {
78 return;
79 }
80 currCtr++;
81 if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) {
82 mergedTimeTuple.add(String.valueOf(newTime));
83 mergedValTuple.add(String.valueOf(newVal));
84 }
85 }
86
87 currTimeTup.clear();
88 currTimeTup.addAll(mergedTimeTuple);
89 currValTup.clear();
90 currValTup.addAll(mergedValTuple);
91
92 }
93
94
95 }