View Javadoc

1   package org.apache.helix.spectator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.Comparator;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.TreeSet;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.helix.ConfigChangeListener;
33  import org.apache.helix.ExternalViewChangeListener;
34  import org.apache.helix.HelixDataAccessor;
35  import org.apache.helix.NotificationContext;
36  import org.apache.helix.PropertyKey.Builder;
37  import org.apache.helix.model.ExternalView;
38  import org.apache.helix.model.InstanceConfig;
39  import org.apache.log4j.Logger;
40  
41  
42  public class RoutingTableProvider implements ExternalViewChangeListener, ConfigChangeListener
43  {
44    private static final Logger logger = Logger.getLogger(RoutingTableProvider.class);
45    private final AtomicReference<RoutingTable> _routingTableRef;
46  
47    public RoutingTableProvider()
48    {
49      _routingTableRef = new AtomicReference<RoutingTableProvider.RoutingTable>(new RoutingTable());
50  
51    }
52  
53    /**
54     * returns the instances for {resource,partition} pair that are in a specific
55     * {state}
56     * 
57     * @param resourceName
58     *          -
59     * @param partitionName
60     * @param state
61     * @return empty list if there is no instance in a given state
62     */
63    public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state)
64    {
65      List<InstanceConfig> instanceList = null;
66      RoutingTable _routingTable = _routingTableRef.get();
67      ResourceInfo resourceInfo = _routingTable.get(resourceName);
68      if (resourceInfo != null)
69      {
70        PartitionInfo keyInfo = resourceInfo.get(partitionName);
71        if (keyInfo != null)
72        {
73          instanceList = keyInfo.get(state);
74        }
75      }
76      if (instanceList == null)
77      {
78        instanceList = Collections.emptyList();
79      }
80      return instanceList;
81    }
82  
83    /**
84     * returns all instances for {resource} that are in a specific {state}
85     * 
86     * @param resource
87     * @param state
88     * @return empty list if there is no instance in a given state
89     */
90    public Set<InstanceConfig> getInstances(String resource, String state)
91    {
92      Set<InstanceConfig> instanceSet = null;
93      RoutingTable routingTable = _routingTableRef.get();
94      ResourceInfo resourceInfo = routingTable.get(resource);
95      if (resourceInfo != null)
96      {
97        instanceSet = resourceInfo.getInstances(state);
98      }
99      if (instanceSet == null)
100     {
101       instanceSet = Collections.emptySet();
102     }
103     return instanceSet;
104   }
105 
106   @Override
107   public void onExternalViewChange(List<ExternalView> externalViewList,
108       NotificationContext changeContext)
109   {
110     // session has expired clean up the routing table
111     if (changeContext.getType() == NotificationContext.Type.FINALIZE)
112     {
113       logger.info("Resetting the routing table. ");
114       RoutingTable newRoutingTable = new RoutingTable();
115       _routingTableRef.set(newRoutingTable);
116       return;
117     }
118     refresh(externalViewList, changeContext);
119   }
120 
121   @Override
122   public void onConfigChange(List<InstanceConfig> configs,
123                              NotificationContext changeContext)
124   {
125     // session has expired clean up the routing table
126     if (changeContext.getType() == NotificationContext.Type.FINALIZE)
127     {
128       logger.info("Resetting the routing table. ");
129       RoutingTable newRoutingTable = new RoutingTable();
130       _routingTableRef.set(newRoutingTable);
131       return;
132     }
133     
134     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
135     Builder keyBuilder = accessor.keyBuilder();
136     List<ExternalView> externalViewList = accessor.getChildValues(keyBuilder.externalViews());
137     refresh(externalViewList, changeContext);    
138   }
139   
140   private void refresh(List<ExternalView> externalViewList, NotificationContext changeContext)
141   {
142     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
143     Builder keyBuilder = accessor.keyBuilder();
144     
145     List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
146     Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>();
147     for (InstanceConfig config : configList)
148     {
149       instanceConfigMap.put(config.getId(), config);
150     }
151     RoutingTable newRoutingTable = new RoutingTable();
152     if (externalViewList != null)
153     {
154       for (ExternalView extView : externalViewList)
155       {
156         String resourceName = extView.getId();
157         for (String partitionName : extView.getPartitionSet())
158         {
159           Map<String, String> stateMap = extView.getStateMap(partitionName);
160           for (String instanceName : stateMap.keySet())
161           {
162             String currentState = stateMap.get(instanceName);
163             if (instanceConfigMap.containsKey(instanceName))
164             {
165               InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
166               newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig);
167             } else
168             {
169               logger.error("Invalid instance name." + instanceName
170                   + " .Not found in /cluster/configs/. instanceName: ");
171             }
172 
173           }
174         }
175       }
176     }
177     _routingTableRef.set(newRoutingTable);
178   }
179 
180   class RoutingTable
181   {
182     private final HashMap<String, ResourceInfo> resourceInfoMap;
183 
184     public RoutingTable()
185     {
186       resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>();
187     }
188 
189     public void addEntry(String resourceName, String partitionName, String state,
190         InstanceConfig config)
191     {
192       if (!resourceInfoMap.containsKey(resourceName))
193       {
194         resourceInfoMap.put(resourceName, new ResourceInfo());
195       }
196       ResourceInfo resourceInfo = resourceInfoMap.get(resourceName);
197       resourceInfo.addEntry(partitionName, state, config);
198 
199     }
200 
201     ResourceInfo get(String resourceName)
202     {
203       return resourceInfoMap.get(resourceName);
204     }
205 
206   }
207 
208   class ResourceInfo
209   {
210     // store PartitionInfo for each partition
211     HashMap<String, PartitionInfo> partitionInfoMap;
212     // stores the Set of Instances in a given state
213     HashMap<String, Set<InstanceConfig>> stateInfoMap;
214 
215     public ResourceInfo()
216     {
217       partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>();
218       stateInfoMap = new HashMap<String, Set<InstanceConfig>>();
219     }
220 
221     public void addEntry(String stateUnitKey, String state, InstanceConfig config)
222     {
223       // add
224       if (!stateInfoMap.containsKey(state))
225       {
226         Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() {
227 
228           @Override
229           public int compare(InstanceConfig o1, InstanceConfig o2)
230           {
231             if (o1 == o2)
232             {
233               return 0;
234             }
235             if (o1 == null)
236             {
237               return -1;
238             }
239             if (o2 == null)
240             {
241               return 1;
242             }
243 
244             int compareTo = o1.getHostName().compareTo(o2.getHostName());
245             if (compareTo == 0)
246             {
247               return o1.getPort().compareTo(o2.getPort());
248             } else
249             {
250               return compareTo;
251             }
252 
253           }
254         };
255         stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator));
256       }
257       Set<InstanceConfig> set = stateInfoMap.get(state);
258       set.add(config);
259 
260       if (!partitionInfoMap.containsKey(stateUnitKey))
261       {
262         partitionInfoMap.put(stateUnitKey, new PartitionInfo());
263       }
264       PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey);
265       stateUnitKeyInfo.addEntry(state, config);
266 
267     }
268 
269     public Set<InstanceConfig> getInstances(String state)
270     {
271       Set<InstanceConfig> instanceSet = stateInfoMap.get(state);
272       return instanceSet;
273     }
274 
275     PartitionInfo get(String stateUnitKey)
276     {
277       return partitionInfoMap.get(stateUnitKey);
278     }
279   }
280 
281   class PartitionInfo
282   {
283     HashMap<String, List<InstanceConfig>> stateInfoMap;
284 
285     public PartitionInfo()
286     {
287       stateInfoMap = new HashMap<String, List<InstanceConfig>>();
288     }
289 
290     public void addEntry(String state, InstanceConfig config)
291     {
292       if (!stateInfoMap.containsKey(state))
293       {
294         stateInfoMap.put(state, new ArrayList<InstanceConfig>());
295       }
296       List<InstanceConfig> list = stateInfoMap.get(state);
297       list.add(config);
298     }
299 
300     List<InstanceConfig> get(String state)
301     {
302       return stateInfoMap.get(state);
303     }
304   }
305 }