View Javadoc

1   package org.apache.helix.alerts;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.helix.HelixException;
23  
24  
25  public class AccumulateAggregator extends Aggregator {
26  
27  	
28  	public AccumulateAggregator() 
29  	{
30  		_numArgs = 0;
31  	}
32  	
33  	@Override
34  	public void merge(Tuple<String> currValTup, Tuple<String> newValTup,
35  			Tuple<String> currTimeTup, Tuple<String> newTimeTup, String... args) {
36  	
37  		double currVal = 0;
38  		double currTime = -1;
39  		double newVal;
40  		double newTime;
41  		double mergedVal;
42  		double mergedTime;
43  		
44  		if (currValTup == null || newValTup == null || currTimeTup == null ||
45  				newTimeTup == null) {
46  			throw new HelixException("Tuples cannot be null");
47  		}
48  		
49  		//old tuples may be empty, indicating no value/time exist
50  		if (currValTup.size() > 0 && currTimeTup.size() > 0) {
51  			currVal = Double.parseDouble(currValTup.iterator().next());
52  			currTime = Double.parseDouble(currTimeTup.iterator().next());
53  		}
54  		newVal = Double.parseDouble(newValTup.iterator().next());
55  		newTime = Double.parseDouble(newTimeTup.iterator().next());
56  		
57  		if (newTime > currTime) { //if old doesn't exist, we end up here
58  			mergedVal = currVal+newVal; //if old doesn't exist, it has value "0"
59  			mergedTime = newTime;
60  		}
61  		else {
62  			mergedVal = currVal;
63  			mergedTime = currTime;
64  		}
65  	
66  		currValTup.clear();
67  		currValTup.add(Double.toString(mergedVal));
68  		currTimeTup.clear();
69  		currTimeTup.add(Double.toString(mergedTime));
70  	}
71  
72  	
73  }