View Javadoc

1   package org.apache.helix.josql;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.apache.helix.HelixDataAccessor;
28  import org.apache.helix.HelixException;
29  import org.apache.helix.HelixManager;
30  import org.apache.helix.HelixProperty;
31  import org.apache.helix.PropertyType;
32  import org.apache.helix.ZNRecord;
33  import org.apache.helix.PropertyKey.Builder;
34  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
35  import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
36  import org.apache.log4j.Logger;
37  import org.josql.Query;
38  import org.josql.QueryExecutionException;
39  import org.josql.QueryParseException;
40  import org.josql.QueryResults;
41  
42  
43  public class ClusterJosqlQueryProcessor
44  {
45    public static final String PARTITIONS = "PARTITIONS";
46    public static final String FLATTABLE = ".Table";
47  
48    HelixManager _manager;
49    private static Logger _logger = Logger.getLogger(ClusterJosqlQueryProcessor.class);
50  
51    public ClusterJosqlQueryProcessor(HelixManager manager)
52    {
53      _manager = manager;
54    }
55  
56    String parseFromTarget(String sql)
57    {
58      // We need to find out the "FROM" target, and replace it with liveInstances
59      // / partitions etc
60      int fromIndex = sql.indexOf("FROM");
61      if (fromIndex == -1)
62      {
63        throw new HelixException("Query must contain FROM target. Query: " + sql);
64      }
65      // Per JoSql, select FROM <target> the target must be a object class that
66      // corresponds to a "table row"
67      // In out case, the row is always a ZNRecord
68  
69      int nextSpace = sql.indexOf(" ", fromIndex);
70      while (sql.charAt(nextSpace) == ' ')
71      {
72        nextSpace++;
73      }
74      int nextnextSpace = sql.indexOf(" ", nextSpace);
75      if (nextnextSpace == -1)
76      {
77        nextnextSpace = sql.length();
78      }
79      String fromTarget = sql.substring(nextSpace, nextnextSpace).trim();
80  
81      if (fromTarget.length() == 0)
82      {
83        throw new HelixException("FROM target in the query cannot be empty. Query: " + sql);
84      }
85      return fromTarget;
86    }
87  
88    public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
89        List<Object> additionalFunctionHandlers, List queryTarget) throws QueryParseException,
90        QueryExecutionException
91    {
92      Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
93  
94      josqlQuery.parse(josql);
95      QueryResults qr = josqlQuery.execute(queryTarget);
96  
97      return qr.getResults();
98    }
99  
100   Query prepareQuery(Map<String, Object> bindVariables, List<Object> additionalFunctionHandlers)
101   {
102     // DataAccessor accessor = _manager.getDataAccessor();
103     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
104     
105     // Get all the ZNRecords in the cluster and set them as bind variables
106     Builder keyBuilder = accessor.keyBuilder();
107 //    List<ZNRecord> instanceConfigs = accessor.getChildValues(PropertyType.CONFIGS,
108 //        ConfigScopeProperty.PARTICIPANT.toString());
109     
110     List<ZNRecord> instanceConfigs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs()));
111 
112     List<ZNRecord> liveInstances = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.liveInstances()));
113     List<ZNRecord> stateModelDefs = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.stateModelDefs()));
114     
115     // Idealstates are stored in a map from resource name to idealState ZNRecord
116     List<ZNRecord> idealStateList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.idealStates()));
117     
118     Map<String, ZNRecord> idealStatesMap = new HashMap<String, ZNRecord>();
119     for (ZNRecord idealState : idealStateList)
120     {
121       idealStatesMap.put(idealState.getId(), idealState);
122     }
123     // Make up the partition list: for selecting partitions
124     List<ZNRecord> partitions = new ArrayList<ZNRecord>();
125     for (ZNRecord idealState : idealStateList)
126     {
127       for (String partitionName : idealState.getMapFields().keySet())
128       {
129         partitions.add(new ZNRecord(partitionName));
130       }
131     }
132     
133     List<ZNRecord> externalViewList = HelixProperty.convertToList(accessor.getChildValues(keyBuilder.externalViews()));
134     // ExternalViews are stored in a map from resource name to idealState
135     // ZNRecord
136     Map<String, ZNRecord> externalViewMap = new HashMap<String, ZNRecord>();
137     for (ZNRecord externalView : externalViewList)
138     {
139       externalViewMap.put(externalView.getId(), externalView);
140     }
141     // Map from instance name to a map from resource to current state ZNRecord
142     Map<String, Map<String, ZNRecord>> currentStatesMap = new HashMap<String, Map<String, ZNRecord>>();
143     // Map from instance name to a list of combined flat ZNRecordRow
144     Map<String, List<ZNRecordRow>> flatCurrentStateMap = new HashMap<String, List<ZNRecordRow>>();
145 
146     for (ZNRecord instance : liveInstances)
147     {
148       String host = instance.getId();
149       String sessionId = instance.getSimpleField(LiveInstanceProperty.SESSION_ID.toString());
150       Map<String, ZNRecord> currentStates = new HashMap<String, ZNRecord>();
151       List<ZNRecord> instanceCurrentStateList = new ArrayList<ZNRecord>();
152       for (ZNRecord idealState : idealStateList)
153       {
154         String resourceName = idealState.getId();
155         
156         HelixProperty property = accessor.getProperty(keyBuilder.currentState(host, sessionId, resourceName));
157         ZNRecord currentState =null;
158         if (property == null)
159         {
160           _logger.warn("Resource " + resourceName + " has null currentState");
161           currentState = new ZNRecord(resourceName);
162         }else{
163           currentState = property.getRecord();
164         }
165         currentStates.put(resourceName, currentState);
166         instanceCurrentStateList.add(currentState);
167       }
168       currentStatesMap.put(host, currentStates);
169       flatCurrentStateMap.put(host, ZNRecordRow.flatten(instanceCurrentStateList));
170     }
171     Query josqlQuery = new Query();
172 
173     // Set the default bind variables
174     josqlQuery
175 .setVariable(
176         PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString(),
177             instanceConfigs);
178     josqlQuery.setVariable(PropertyType.IDEALSTATES.toString(), idealStatesMap);
179     josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString(), liveInstances);
180     josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString(), stateModelDefs);
181     josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString(), externalViewMap);
182     josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString(), currentStatesMap);
183     josqlQuery.setVariable(PARTITIONS, partitions);
184 
185     // Flat version of ZNRecords
186     josqlQuery.setVariable(
187         PropertyType.CONFIGS.toString() + "/" + ConfigScopeProperty.PARTICIPANT.toString()
188             + FLATTABLE,
189         ZNRecordRow.flatten(instanceConfigs));
190     josqlQuery.setVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE,
191         ZNRecordRow.flatten(idealStateList));
192     josqlQuery.setVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE,
193         ZNRecordRow.flatten(liveInstances));
194     josqlQuery.setVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE,
195         ZNRecordRow.flatten(stateModelDefs));
196     josqlQuery.setVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE,
197         ZNRecordRow.flatten(externalViewList));
198     josqlQuery.setVariable(PropertyType.CURRENTSTATES.toString() + FLATTABLE,
199         flatCurrentStateMap.values());
200     josqlQuery.setVariable(PARTITIONS + FLATTABLE, ZNRecordRow.flatten(partitions));
201     // Set additional bind variables
202     if (bindVariables != null)
203     {
204       for (String key : bindVariables.keySet())
205       {
206         josqlQuery.setVariable(key, bindVariables.get(key));
207       }
208     }
209 
210     josqlQuery.addFunctionHandler(new ZNRecordJosqlFunctionHandler());
211     josqlQuery.addFunctionHandler(new ZNRecordRow());
212     josqlQuery.addFunctionHandler(new Integer(0));
213     if (additionalFunctionHandlers != null)
214     {
215       for (Object functionHandler : additionalFunctionHandlers)
216       {
217         josqlQuery.addFunctionHandler(functionHandler);
218       }
219     }
220     return josqlQuery;
221   }
222 
223   public List<Object> runJoSqlQuery(String josql, Map<String, Object> bindVariables,
224       List<Object> additionalFunctionHandlers) throws QueryParseException, QueryExecutionException
225   {
226     Query josqlQuery = prepareQuery(bindVariables, additionalFunctionHandlers);
227 
228     // Per JoSql, select FROM <target> the target must be a object class that
229     // corresponds to a "table row",
230     // while the table (list of Objects) are put in the query by
231     // query.execute(List<Object>). In the input,
232     // In out case, the row is always a ZNRecord. But in SQL, the from target is
233     // a "table name".
234 
235     String fromTargetString = parseFromTarget(josql);
236 
237     List fromTargetList = null;
238     Object fromTarget = null;
239     if (fromTargetString.equalsIgnoreCase(PARTITIONS))
240     {
241       fromTarget = josqlQuery.getVariable(PARTITIONS.toString());
242     } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString()))
243     {
244       fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString());
245     } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
246         + ConfigScopeProperty.PARTICIPANT.toString()))
247     {
248       fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
249           + ConfigScopeProperty.PARTICIPANT.toString());
250     } else if (fromTargetString.equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString()))
251     {
252       fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString());
253     } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString()))
254     {
255       fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString());
256     } 
257     else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString()))
258     {
259       fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString());
260     }
261     
262     else if (fromTargetString.equalsIgnoreCase(PARTITIONS + FLATTABLE))
263     {
264       fromTarget = josqlQuery.getVariable(PARTITIONS.toString() + FLATTABLE);
265     } else if (fromTargetString.equalsIgnoreCase(PropertyType.LIVEINSTANCES.toString() + FLATTABLE))
266     {
267       fromTarget = josqlQuery.getVariable(PropertyType.LIVEINSTANCES.toString() + FLATTABLE);
268     } else if (fromTargetString.equalsIgnoreCase(PropertyType.CONFIGS.toString() + "/"
269         + ConfigScopeProperty.PARTICIPANT.toString()
270         + FLATTABLE))
271     {
272       fromTarget = josqlQuery.getVariable(PropertyType.CONFIGS.toString() + "/"
273           + ConfigScopeProperty.PARTICIPANT.toString() + FLATTABLE);
274     } else if (fromTargetString
275         .equalsIgnoreCase(PropertyType.STATEMODELDEFS.toString() + FLATTABLE))
276     {
277       fromTarget = josqlQuery.getVariable(PropertyType.STATEMODELDEFS.toString() + FLATTABLE);
278     } else if (fromTargetString.equalsIgnoreCase(PropertyType.EXTERNALVIEW.toString() + FLATTABLE))
279     {
280       fromTarget = josqlQuery.getVariable(PropertyType.EXTERNALVIEW.toString() + FLATTABLE);
281     }
282     else if (fromTargetString.equalsIgnoreCase(PropertyType.IDEALSTATES.toString() + FLATTABLE))
283     {
284       fromTarget = josqlQuery.getVariable(PropertyType.IDEALSTATES.toString() + FLATTABLE);
285     }
286     else
287     {
288       throw new HelixException(
289           "Unknown query target "
290               + fromTargetString
291               + ". Target should be PARTITIONS, LIVEINSTANCES, CONFIGS, STATEMODELDEFS, IDEALSTATES, EXTERNALVIEW, and corresponding flat Tables");
292     }
293 
294     fromTargetList = fromTargetString.endsWith(FLATTABLE) ? ((List<ZNRecordRow>) fromTarget)
295         : ((List<ZNRecord>) fromTarget);
296 
297     // Per JoSql, select FROM <target> the target must be a object class that
298     // corresponds to a "table row"
299     // In out case, the row is always a ZNRecord
300     josql = josql.replaceFirst(
301         fromTargetString,
302         fromTargetString.endsWith(FLATTABLE) ? ZNRecordRow.class.getName() : ZNRecord.class
303             .getName());
304     josqlQuery.parse(josql);
305     QueryResults qr = josqlQuery.execute(fromTargetList);
306     return qr.getResults();
307   }
308 }