1 package org.apache.helix.josql;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.apache.helix.HelixDataAccessor;
28 import org.apache.helix.HelixException;
29 import org.apache.helix.HelixManager;
30 import org.apache.helix.HelixProperty;
31 import org.apache.helix.PropertyType;
32 import org.apache.helix.ZNRecord;
33 import org.apache.helix.PropertyKey.Builder;
34 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
35 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
36 import org.apache.log4j.Logger;
37 import org.josql.Query;
38 import org.josql.QueryExecutionException;
39 import org.josql.QueryParseException;
40 import org.josql.QueryResults;
41
42
43 public class ClusterJosqlQueryProcessor
44 {
45 public static final String PARTITIONS = "PARTITIONS";
46 public static final String FLATTABLE = ".Table";
47
48 HelixManager _manager;
49 private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
50
51 public ClusterJosqlQueryProcessor(HelixManager manager)
52 {
53 _manager = manager;
54 }
55
56 String parseFromTarget(String sql)
57 {
58
59
60 int fromIndex = sql.indexOf("FROM");
61 if (fromIndex == -1)
62 {
63 throw new HelixException("Query must contain FROM target. Query: " + sql);
64 }
65
66
67
68
69 int nextSpace = sql.indexOf(" ", fromIndex);
70 while (sql.charAt(nextSpace) == ' ')
71 {
72 nextSpace++;
73 }
74 int nextnextSpace = sql.indexOf(" ", nextSpace);
75 if (nextnextSpace == -1)
76 {
77 nextnextSpace = sql.length();
78 }
79 String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
80
81 if (fromTarget.length() == 0)
82 {
83 throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
84 }
85 return fromTarget;
86 }
87
88 public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
89 List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
90 QueryExecutionException
91 {
92 Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
93
94 josqlQuery.parse(josql);
95 QueryResults qr = josqlQuery.execute(queryTarget);
96
97 return qr.getResults();
98 }
99
100 Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
101 {
102
103 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
104
105
106 Builder keyBuilder = accessor.keyBuilder();
107
108
109
110 List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
111
112 List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
113 List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
114
115
116 List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
117
118 Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
119 for (ZNRecord idealState : idealStateList)
120 {
121 idealStatesMap.put(idealState.getId(), idealState);
122 }
123
124 List<ZNRecord> partitions = new ArrayList<ZNRecord>();
125 for (ZNRecord idealState : idealStateList)
126 {
127 for (String partitionName : idealState.getMapFields().keySet())
128 {
129 partitions.add(new ZNRecord(partitionName));
130 }
131 }
132
133 List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
134
135
136 Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
137 for (ZNRecord externalView : externalViewList)
138 {
139 externalViewMap.put(externalView.getId(), externalView);
140 }
141
142 Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
143
144 Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
145
146 for (ZNRecord instance : liveInstances)
147 {
148 String host = instance.getId();
149 String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
150 Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
151 List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
152 for (ZNRecord idealState : idealStateList)
153 {
154 String resourceName = idealState.getId();
155
156 HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
157 ZNRecord currentState =null;
158 if (property == null)
159 {
160 _logger.warn("Resource " + resourceName + " has null currentState");
161 currentState = new ZNRecord(resourceName);
162 }else{
163 currentState = property.getRecord();
164 }
165 currentStates.put(resourceName, currentState);
166 instanceCurrentStateList.add(currentState);
167 }
168 currentStatesMap.put(host, currentStates);
169 flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
170 }
171 Query josqlQuery = new Query();
172
173
174 josqlQuery
175 .setVariable(
176 PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
177 instanceConfigs);
178 josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
179 josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
180 josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
181 josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
182 josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
183 josqlQuery.setVariable(PARTITIONS, partitions);
184
185
186 josqlQuery.setVariable(
187 PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
188 + FLATTABLE,
189 ZNRecordRow.flatten(instanceConfigs));
190 josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
191 ZNRecordRow.flatten(idealStateList));
192 josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
193 ZNRecordRow.flatten(liveInstances));
194 josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
195 ZNRecordRow.flatten(stateModelDefs));
196 josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
197 ZNRecordRow.flatten(externalViewList));
198 josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
199 flatCurrentStateMap.values());
200 josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
201
202 if (bindVariables != null)
203 {
204 for (String key : bindVariables.keySet())
205 {
206 josqlQuery.setVariable(key, bindVariables.get(key));
207 }
208 }
209
210 josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
211 josqlQuery.addFunctionHandler(new ZNRecordRow());
212 josqlQuery.addFunctionHandler(new Integer(0));
213 if (additionalFunctionHandlers != null)
214 {
215 for (Object functionHandler : additionalFunctionHandlers)
216 {
217 josqlQuery.addFunctionHandler(functionHandler);
218 }
219 }
220 return josqlQuery;
221 }
222
223 public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
224 List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
225 {
226 Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
227
228
229
230
231
232
233
234
235 String fromTargetString = parseFromTarget(josql);
236
237 List fromTargetList = null;
238 Object fromTarget = null;
239 if (fromTargetString.equalsIgnoreCase(PARTITIONS))
240 {
241 fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
242 } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
243 {
244 fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
245 } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
246 + ConfigScopeProperty.PARTICIPANT.toString()))
247 {
248 fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
249 + ConfigScopeProperty.PARTICIPANT.toString());
250 } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
251 {
252 fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
253 } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
254 {
255 fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
256 }
257 else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
258 {
259 fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
260 }
261
262 else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
263 {
264 fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
265 } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
266 {
267 fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
268 } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
269 + ConfigScopeProperty.PARTICIPANT.toString()
270 + FLATTABLE))
271 {
272 fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
273 + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
274 } else if (fromTargetString
275 .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
276 {
277 fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
278 } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
279 {
280 fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
281 }
282 else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
283 {
284 fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
285 }
286 else
287 {
288 throw new HelixException(
289 "Unknown query target "
290 + fromTargetString
291 + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
292 }
293
294 fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
295 : ((List<ZNRecord>) fromTarget);
296
297
298
299
300 josql = josql.replaceFirst(
301 fromTargetString,
302 fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
303 .getName());
304 josqlQuery.parse(josql);
305 QueryResults qr = josqlQuery.execute(fromTargetList);
306 return qr.getResults();
307 }
308 }