View Javadoc

1   package org.apache.helix.alerts;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Iterator;
23  
24  import org.apache.helix.HelixException;
25  
26  
27  public class WindowAggregator extends Aggregator {
28  
29  	
30  	int _windowSize;
31  	
32  	public WindowAggregator(String windowSize) 
33  	{
34  		_windowSize = Integer.parseInt(windowSize);
35  		_numArgs = 1;
36  	}
37  	
38  	public WindowAggregator()
39  	{
40  		this("1");
41  	}
42  
43  	@Override
44  	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
45  			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
46  		
47  		_windowSize = Integer.parseInt(args[0]);
48  		
49  		//figure out how many curr tuple values we displace
50  		Tuple<String> mergedTimeTuple = new Tuple<String>();
51  		Tuple<String> mergedValTuple = new Tuple<String>();
52  		
53  		Iterator<String> currTimeIter = currTimeTup.iterator();
54  		Iterator<String> currValIter = currValTup.iterator();
55  		Iterator<String> newTimeIter = newTimeTup.iterator();
56  		Iterator<String> newValIter = newValTup.iterator();
57  		int currCtr = 0;
58  		//traverse current vals
59  		double currTime = -1;
60  		double currVal;
61  		while (currTimeIter.hasNext()) {
62  			currTime = Double.parseDouble(currTimeIter.next());
63  			currVal = Double.parseDouble(currValIter.next());
64  			currCtr++;
65  			//number of evicted currVals equal to total size of both minus _windowSize
66  			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element, just bump down
67  				mergedTimeTuple.add(String.valueOf(currTime));
68  				mergedValTuple.add(String.valueOf(currVal));
69  			}
70  		}
71  
72  		double newVal;
73  		double newTime;
74  		while (newTimeIter.hasNext()) {
75  			newVal = Double.parseDouble(newValIter.next());
76  			newTime = Double.parseDouble(newTimeIter.next());
77  			if (newTime <= currTime) { //oldest new time older than newest curr time.  we will not apply new tuple!
78  				return; //curr tuples remain the same
79  			}
80  			currCtr++;
81  			if (currCtr > (newTimeTup.size()+currTimeTup.size()-_windowSize)) { //non-evicted element
82  				mergedTimeTuple.add(String.valueOf(newTime));
83  				mergedValTuple.add(String.valueOf(newVal));
84  			}
85  		}
86  		 //set curr tuples to merged tuples
87  		currTimeTup.clear();
88  		currTimeTup.addAll(mergedTimeTuple);
89  		currValTup.clear();
90  		currValTup.addAll(mergedValTuple);
91  		//TODO: see if we can do merger in place on curr
92  	}
93  
94  	
95  }