View Javadoc

1   package org.apache.helix;
2   /*
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   */
20  //import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.Map;
23  //import java.util.Map.Entry;
24  import java.util.concurrent.ConcurrentHashMap;
25  //import java.util.concurrent.ConcurrentMap;
26  
27  import org.apache.helix.EspressoStorageMockStateModelFactory;
28  import org.apache.helix.HelixDataAccessor;
29  import org.apache.helix.PropertyType;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
33  import org.apache.helix.healthcheck.StatHealthReportProvider;
34  import org.apache.helix.model.IdealState;
35  import org.apache.helix.model.Message.MessageType;
36  import org.apache.helix.participant.StateMachineEngine;
37  import org.apache.helix.participant.statemachine.StateModel;
38  import org.apache.log4j.Logger;
39  
40  public class EspressoStorageMockNode extends MockNode {
41  
42  	private static final Logger logger = Logger
43  			.getLogger(EspressoStorageMockNode.class);
44  
45  	private final String GET_STAT_NAME = "get";
46  	private final String SET_STAT_NAME = "set";
47  	private final String COUNT_STAT_TYPE = "count";
48  	private final String REPORT_NAME = "ParticipantStats";
49  
50  	StatHealthReportProvider _healthProvider;
51  	//PerformanceHealthReportProvider _healthProvider;
52  	EspressoStorageMockStateModelFactory _stateModelFactory;
53  
54  	HashSet<String>_partitions;
55  
56  	ConcurrentHashMap<String, String> _keyValueMap;
57  	FnvHashFunction _hashFunction;
58  	int _numTotalEspressoPartitions = 0;
59  
60  	public EspressoStorageMockNode(CMConnector cm) {
61  		super(cm);
62  		_stateModelFactory = new EspressoStorageMockStateModelFactory(0);
63  
64  //		StateMachineEngine genericStateMachineHandler = new StateMachineEngine();
65  		StateMachineEngine stateMach = _cmConnector.getManager().getStateMachineEngine();
66  		stateMach.registerStateModelFactory("MasterSlave", _stateModelFactory);
67  //		_cmConnector
68  //				.getManager()
69  //				.getMessagingService()
70  //				.registerMessageHandlerFactory(
71  //						MessageType.STATE_TRANSITION.toString(),
72  //						genericStateMachineHandler);
73          /*
74  		_healthProvider = new StatHealthReportProvider();
75  		_healthProvider.setReportName(REPORT_NAME);
76         */
77  
78  		_healthProvider = new StatHealthReportProvider();
79  		//_healthProvider.setReportName(REPORT_NAME);
80  
81  		_cmConnector.getManager().getHealthReportCollector()
82  				.addHealthReportProvider(_healthProvider);
83  		_partitions = new HashSet<String>();
84  		_keyValueMap = new ConcurrentHashMap<String, String>();
85  		_hashFunction = new FnvHashFunction();
86  
87  		//start thread to keep checking what partitions this node owns
88  		//Thread partitionGetter = new Thread(new PartitionGetterThread());
89  		//partitionGetter.start();
90  		//logger.debug("set partition getter thread to run");
91  	}
92  
93  	public String formStatName(String dbName, String partitionName, String metricName)
94  	{
95  		String statName;
96  		statName = "db"+dbName+".partition"+partitionName+"."+metricName;
97  		return statName;
98  
99  	}
100 
101 	public String doGet(String dbId, String key) {
102 		String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
103 		if (!isPartitionOwnedByNode(partition)) {
104 			logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
105 			return null;
106 		}
107 
108 		//_healthProvider.submitIncrementPartitionRequestCount(partition);
109 		//_healthProvider.incrementPartitionStat(GET_STAT_NAME, partition);
110 		_healthProvider.incrementStat(formStatName(dbId, partition, "getCount"), String.valueOf(System.currentTimeMillis()));
111 		return _keyValueMap.get(key);
112 	}
113 
114 	public void doPut(String dbId, String key, String value) {
115 		String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
116 		if (!isPartitionOwnedByNode(partition)) {
117 			logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
118 			return;
119 		}
120 
121 		//_healthProvider.submitIncrementPartitionRequestCount(partition);
122 		//_healthProvider.incrementPartitionStat(SET_STAT_NAME, partition);
123 		//_healthProvider.incrementStat(SET_STAT_NAME, COUNT_STAT_TYPE,
124 		//		dbId, partition, "FIXMENODENAME", String.valueOf(System.currentTimeMillis()));
125 		_healthProvider.incrementStat(formStatName(dbId, partition, "putCount"), String.valueOf(System.currentTimeMillis()));
126 
127 		_keyValueMap.put(key, value);
128 	}
129 
130 	private String getPartitionName(String databaseName, int partitionNum) {
131 		return databaseName+"_"+partitionNum;
132 	}
133 
134 	private boolean isPartitionOwnedByNode(String partitionName) {
135 		Map<String, StateModel> stateModelMap = _stateModelFactory
136 				.getStateModelMap();
137 		logger.debug("state model map size: "+stateModelMap.size());
138 
139 		return (stateModelMap.keySet().contains(partitionName));
140 	}
141 
142 	private int getKeyPartition(String dbName, String key) {
143 		int numPartitions = getNumPartitions(dbName);
144 		logger.debug("numPartitions: "+numPartitions);
145 		int part = Math.abs((int)_hashFunction.hash(key.getBytes(), numPartitions));
146 		logger.debug("part: "+part);
147 		return part;
148 	}
149 
150 	private int getNumPartitions(String dbName) {
151 		logger.debug("dbName: "+dbName);
152 		HelixDataAccessor helixDataAccessor = _cmConnector.getManager().getHelixDataAccessor();
153 		Builder keyBuilder = helixDataAccessor.keyBuilder();
154     ZNRecord rec = helixDataAccessor.getProperty(keyBuilder.idealStates(dbName)).getRecord();
155 		if (rec == null) {
156 			logger.debug("rec is null");
157 		}
158 		IdealState state = new IdealState(rec);
159 		return state.getNumPartitions();
160 	}
161 
162 	class PartitionGetterThread implements Runnable {
163 
164 		@Override
165 		public void run() {
166 			while (true) {
167 				synchronized (_partitions) {
168 					//logger.debug("Building partition map");
169 					_partitions.clear();
170 					Map<String, StateModel> stateModelMap = _stateModelFactory
171 							.getStateModelMap();
172 					for (String s: stateModelMap.keySet()) {
173 						logger.debug("adding key "+s);
174 						_partitions.add(s);
175 					}
176 				}
177 				//sleep for 60 seconds
178 				try {
179 					Thread.sleep(60000);
180 				} catch (InterruptedException e) {
181 					// TODO Auto-generated catch block
182 					e.printStackTrace();
183 				}
184 			}
185 		}
186 	}
187 
188 	@Override
189 	public void run() {
190 
191 	}
192 
193 
194 
195 
196 }