View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  
29  import org.apache.helix.controller.pipeline.AbstractBaseStage;
30  import org.apache.helix.controller.pipeline.StageException;
31  import org.apache.helix.model.ClusterConstraints;
32  import org.apache.helix.model.Message;
33  import org.apache.helix.model.Partition;
34  import org.apache.helix.model.Resource;
35  import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
36  import org.apache.helix.model.ConstraintItem;
37  import org.apache.helix.model.ClusterConstraints.ConstraintType;
38  import org.apache.helix.model.ClusterConstraints.ConstraintValue;
39  import org.apache.log4j.Logger;
40  
41  
42  public class MessageThrottleStage extends AbstractBaseStage
43  {
44    private static final Logger LOG =
45                                        Logger.getLogger(MessageThrottleStage.class.getName());
46  
47    int valueOf(String valueStr)
48    {
49      int value = Integer.MAX_VALUE;
50  
51      try
52      {
53        ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
54        switch (valueToken)
55        {
56        case ANY:
57          value = Integer.MAX_VALUE;
58          break;
59        default:
60          LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
61              + Integer.MAX_VALUE);
62          break;
63        }
64      }
65      catch (Exception e)
66      {
67        try
68        {
69          value = Integer.parseInt(valueStr);
70        }
71        catch (NumberFormatException ne)
72        {
73          LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
74              + Integer.MAX_VALUE);
75        }
76      }
77      return value;
78    }
79  
80    /**
81     * constraints are selected in the order of the following rules: 1) don't select
82     * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
83     * other, select the most specific one 3) if a message matches multiple constraints of
84     * incomparable specificity, select the one with the minimum value 4) if a message
85     * matches multiple constraints of incomparable specificity, and they all have the same
86     * value, select the first in alphabetic order
87     */
88    Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
89                                          Map<ConstraintAttribute, String> attributes)
90    {
91      Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
92      for (ConstraintItem item : items)
93      {
94        // don't select constraints with CONSTRAINT_VALUE=ANY
95        if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
96        {
97          continue;
98        }
99  
100       String key = item.filter(attributes).toString();
101       if (!selectedItems.containsKey(key))
102       {
103         selectedItems.put(key, item);
104       }
105       else
106       {
107         ConstraintItem existingItem = selectedItems.get(key);
108         if (existingItem.match(item.getAttributes()))
109         {
110           // item is more specific than existingItem
111           selectedItems.put(key, item);
112         }
113         else if (!item.match(existingItem.getAttributes()))
114         {
115           // existingItem and item are of incomparable specificity
116           int value = valueOf(item.getConstraintValue());
117           int existingValue = valueOf(existingItem.getConstraintValue());
118           if (value < existingValue)
119           {
120             // item's constraint value is less than that of existingItem
121             selectedItems.put(key, item);
122           }
123           else if (value == existingValue)
124           {
125             if (item.toString().compareTo(existingItem.toString()) < 0)
126             {
127               // item is ahead of existingItem in alphabetic order
128               selectedItems.put(key, item);
129             }
130           }
131         }
132       }
133     }
134     return new HashSet<ConstraintItem>(selectedItems.values());
135   }
136 
137   @Override
138   public void process(ClusterEvent event) throws Exception
139   {
140     ClusterDataCache cache = event.getAttribute("ClusterDataCache");
141     MessageSelectionStageOutput msgSelectionOutput =
142         event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
143     Map<String, Resource> resourceMap =
144         event.getAttribute(AttributeName.RESOURCES.toString());
145 
146     if (cache == null || resourceMap == null || msgSelectionOutput == null)
147     {
148       throw new StageException("Missing attributes in event: " + event
149           + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
150     }
151 
152     MessageThrottleStageOutput output = new MessageThrottleStageOutput();
153 
154         ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
155     Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
156 
157     if (constraint != null)
158     {
159       // go through all pending messages, they should be counted but not throttled
160       for (String instance : cache.getLiveInstances().keySet())
161       {
162         throttle(throttleCounterMap,
163                  constraint,
164                  new ArrayList<Message>(cache.getMessages(instance).values()),
165                  false);
166       }
167     }
168 
169     // go through all new messages, throttle if necessary
170     // assume messages should be sorted by state transition priority in messageSelection stage
171     for (String resourceName : resourceMap.keySet())
172     {
173       Resource resource = resourceMap.get(resourceName);
174       for (Partition partition : resource.getPartitions())
175       {
176         List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
177         if (constraint != null && messages != null && messages.size() > 0)
178         {
179           messages = throttle(throttleCounterMap, constraint, messages, true);
180         }
181         output.addMessages(resourceName, partition, messages);
182       }
183     }
184 
185     event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
186   }
187 
188   private List<Message> throttle(Map<String, Integer> throttleMap,
189                                  ClusterConstraints constraint,
190                                  List<Message> messages,
191                                  final boolean needThrottle)
192   {
193   
194     List<Message> throttleOutputMsgs = new ArrayList<Message>();
195     for (Message message : messages)
196     {
197       Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
198 
199       Set<ConstraintItem> matches = constraint.match(msgAttr);
200       matches = selectConstraints(matches, msgAttr);
201 
202       boolean msgThrottled = false;
203       for (ConstraintItem item : matches)
204       {
205         String key = item.filter(msgAttr).toString();
206         if (!throttleMap.containsKey(key))
207         {
208           throttleMap.put(key, valueOf(item.getConstraintValue()));
209         }
210         int value = throttleMap.get(key);
211         throttleMap.put(key, --value);
212 
213         if (needThrottle && value < 0)
214         {
215           msgThrottled = true;
216           
217           if (LOG.isDebugEnabled())
218           {
219             // TODO: printout constraint item that throttles the message
220             LOG.debug("message: " + message + " is throttled by constraint: " + item);
221           }
222         }
223       }
224 
225       if (!msgThrottled)
226       {
227         throttleOutputMsgs.add(message);
228       }
229     }
230 
231     return throttleOutputMsgs;
232   }
233 }