1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import java.util.HashSet;
22 import java.util.Map;
23
24 import java.util.concurrent.ConcurrentHashMap;
25
26
27 import org.apache.helix.EspressoStorageMockStateModelFactory;
28 import org.apache.helix.HelixDataAccessor;
29 import org.apache.helix.PropertyType;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.healthcheck.PerformanceHealthReportProvider;
33 import org.apache.helix.healthcheck.StatHealthReportProvider;
34 import org.apache.helix.model.IdealState;
35 import org.apache.helix.model.Message.MessageType;
36 import org.apache.helix.participant.StateMachineEngine;
37 import org.apache.helix.participant.statemachine.StateModel;
38 import org.apache.log4j.Logger;
39
40 public class EspressoStorageMockNode extends MockNode {
41
42 private static final Logger logger = Logger
43 .getLogger(EspressoStorageMockNode.class);
44
45 private final String GET_STAT_NAME = "get";
46 private final String SET_STAT_NAME = "set";
47 private final String COUNT_STAT_TYPE = "count";
48 private final String REPORT_NAME = "ParticipantStats";
49
50 StatHealthReportProvider _healthProvider;
51
52 EspressoStorageMockStateModelFactory _stateModelFactory;
53
54 HashSet<String>_partitions;
55
56 ConcurrentHashMap<String, String> _keyValueMap;
57 FnvHashFunction _hashFunction;
58 int _numTotalEspressoPartitions = 0;
59
60 public EspressoStorageMockNode(CMConnector cm) {
61 super(cm);
62 _stateModelFactory = new EspressoStorageMockStateModelFactory(0);
63
64
65 StateMachineEngine stateMach = _cmConnector.getManager().getStateMachineEngine();
66 stateMach.registerStateModelFactory("MasterSlave", _stateModelFactory);
67
68
69
70
71
72
73
74
75
76
77
78 _healthProvider = new StatHealthReportProvider();
79
80
81 _cmConnector.getManager().getHealthReportCollector()
82 .addHealthReportProvider(_healthProvider);
83 _partitions = new HashSet<String>();
84 _keyValueMap = new ConcurrentHashMap<String, String>();
85 _hashFunction = new FnvHashFunction();
86
87
88
89
90
91 }
92
93 public String formStatName(String dbName, String partitionName, String metricName)
94 {
95 String statName;
96 statName = "db"+dbName+".partition"+partitionName+"."+metricName;
97 return statName;
98
99 }
100
101 public String doGet(String dbId, String key) {
102 String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
103 if (!isPartitionOwnedByNode(partition)) {
104 logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
105 return null;
106 }
107
108
109
110 _healthProvider.incrementStat(formStatName(dbId, partition, "getCount"), String.valueOf(System.currentTimeMillis()));
111 return _keyValueMap.get(key);
112 }
113
114 public void doPut(String dbId, String key, String value) {
115 String partition = getPartitionName(dbId, getKeyPartition(dbId, key));
116 if (!isPartitionOwnedByNode(partition)) {
117 logger.error("Key "+key+" hashed to partition "+partition +" but this node does not own it.");
118 return;
119 }
120
121
122
123
124
125 _healthProvider.incrementStat(formStatName(dbId, partition, "putCount"), String.valueOf(System.currentTimeMillis()));
126
127 _keyValueMap.put(key, value);
128 }
129
130 private String getPartitionName(String databaseName, int partitionNum) {
131 return databaseName+"_"+partitionNum;
132 }
133
134 private boolean isPartitionOwnedByNode(String partitionName) {
135 Map<String, StateModel> stateModelMap = _stateModelFactory
136 .getStateModelMap();
137 logger.debug("state model map size: "+stateModelMap.size());
138
139 return (stateModelMap.keySet().contains(partitionName));
140 }
141
142 private int getKeyPartition(String dbName, String key) {
143 int numPartitions = getNumPartitions(dbName);
144 logger.debug("numPartitions: "+numPartitions);
145 int part = Math.abs((int)_hashFunction.hash(key.getBytes(), numPartitions));
146 logger.debug("part: "+part);
147 return part;
148 }
149
150 private int getNumPartitions(String dbName) {
151 logger.debug("dbName: "+dbName);
152 HelixDataAccessor helixDataAccessor = _cmConnector.getManager().getHelixDataAccessor();
153 Builder keyBuilder = helixDataAccessor.keyBuilder();
154 ZNRecord rec = helixDataAccessor.getProperty(keyBuilder.idealStates(dbName)).getRecord();
155 if (rec == null) {
156 logger.debug("rec is null");
157 }
158 IdealState state = new IdealState(rec);
159 return state.getNumPartitions();
160 }
161
162 class PartitionGetterThread implements Runnable {
163
164 @Override
165 public void run() {
166 while (true) {
167 synchronized (_partitions) {
168
169 _partitions.clear();
170 Map<String, StateModel> stateModelMap = _stateModelFactory
171 .getStateModelMap();
172 for (String s: stateModelMap.keySet()) {
173 logger.debug("adding key "+s);
174 _partitions.add(s);
175 }
176 }
177
178 try {
179 Thread.sleep(60000);
180 } catch (InterruptedException e) {
181
182 e.printStackTrace();
183 }
184 }
185 }
186 }
187
188 @Override
189 public void run() {
190
191 }
192
193
194
195
196 }