1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28
29 import org.apache.helix.controller.pipeline.AbstractBaseStage;
30 import org.apache.helix.controller.pipeline.StageException;
31 import org.apache.helix.model.ClusterConstraints;
32 import org.apache.helix.model.Message;
33 import org.apache.helix.model.Partition;
34 import org.apache.helix.model.Resource;
35 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
36 import org.apache.helix.model.ConstraintItem;
37 import org.apache.helix.model.ClusterConstraints.ConstraintType;
38 import org.apache.helix.model.ClusterConstraints.ConstraintValue;
39 import org.apache.log4j.Logger;
40
41
42 public class MessageThrottleStage extends AbstractBaseStage
43 {
44 private static final Logger LOG =
45 Logger.getLogger(MessageThrottleStage.class.getName());
46
47 int valueOf(String valueStr)
48 {
49 int value = Integer.MAX_VALUE;
50
51 try
52 {
53 ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
54 switch (valueToken)
55 {
56 case ANY:
57 value = Integer.MAX_VALUE;
58 break;
59 default:
60 LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
61 + Integer.MAX_VALUE);
62 break;
63 }
64 }
65 catch (Exception e)
66 {
67 try
68 {
69 value = Integer.parseInt(valueStr);
70 }
71 catch (NumberFormatException ne)
72 {
73 LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
74 + Integer.MAX_VALUE);
75 }
76 }
77 return value;
78 }
79
80
81
82
83
84
85
86
87
88 Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
89 Map<ConstraintAttribute, String> attributes)
90 {
91 Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
92 for (ConstraintItem item : items)
93 {
94
95 if (item.getConstraintValue().equals(ConstraintValue.ANY.toString()))
96 {
97 continue;
98 }
99
100 String key = item.filter(attributes).toString();
101 if (!selectedItems.containsKey(key))
102 {
103 selectedItems.put(key, item);
104 }
105 else
106 {
107 ConstraintItem existingItem = selectedItems.get(key);
108 if (existingItem.match(item.getAttributes()))
109 {
110
111 selectedItems.put(key, item);
112 }
113 else if (!item.match(existingItem.getAttributes()))
114 {
115
116 int value = valueOf(item.getConstraintValue());
117 int existingValue = valueOf(existingItem.getConstraintValue());
118 if (value < existingValue)
119 {
120
121 selectedItems.put(key, item);
122 }
123 else if (value == existingValue)
124 {
125 if (item.toString().compareTo(existingItem.toString()) < 0)
126 {
127
128 selectedItems.put(key, item);
129 }
130 }
131 }
132 }
133 }
134 return new HashSet<ConstraintItem>(selectedItems.values());
135 }
136
137 @Override
138 public void process(ClusterEvent event) throws Exception
139 {
140 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
141 MessageSelectionStageOutput msgSelectionOutput =
142 event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
143 Map<String, Resource> resourceMap =
144 event.getAttribute(AttributeName.RESOURCES.toString());
145
146 if (cache == null || resourceMap == null || msgSelectionOutput == null)
147 {
148 throw new StageException("Missing attributes in event: " + event
149 + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
150 }
151
152 MessageThrottleStageOutput output = new MessageThrottleStageOutput();
153
154 ClusterConstraints constraint = cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
155 Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
156
157 if (constraint != null)
158 {
159
160 for (String instance : cache.getLiveInstances().keySet())
161 {
162 throttle(throttleCounterMap,
163 constraint,
164 new ArrayList<Message>(cache.getMessages(instance).values()),
165 false);
166 }
167 }
168
169
170
171 for (String resourceName : resourceMap.keySet())
172 {
173 Resource resource = resourceMap.get(resourceName);
174 for (Partition partition : resource.getPartitions())
175 {
176 List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
177 if (constraint != null && messages != null && messages.size() > 0)
178 {
179 messages = throttle(throttleCounterMap, constraint, messages, true);
180 }
181 output.addMessages(resourceName, partition, messages);
182 }
183 }
184
185 event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
186 }
187
188 private List<Message> throttle(Map<String, Integer> throttleMap,
189 ClusterConstraints constraint,
190 List<Message> messages,
191 final boolean needThrottle)
192 {
193
194 List<Message> throttleOutputMsgs = new ArrayList<Message>();
195 for (Message message : messages)
196 {
197 Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
198
199 Set<ConstraintItem> matches = constraint.match(msgAttr);
200 matches = selectConstraints(matches, msgAttr);
201
202 boolean msgThrottled = false;
203 for (ConstraintItem item : matches)
204 {
205 String key = item.filter(msgAttr).toString();
206 if (!throttleMap.containsKey(key))
207 {
208 throttleMap.put(key, valueOf(item.getConstraintValue()));
209 }
210 int value = throttleMap.get(key);
211 throttleMap.put(key, --value);
212
213 if (needThrottle && value < 0)
214 {
215 msgThrottled = true;
216
217 if (LOG.isDebugEnabled())
218 {
219
220 LOG.debug("message: " + message + " is throttled by constraint: " + item);
221 }
222 }
223 }
224
225 if (!msgThrottled)
226 {
227 throttleOutputMsgs.add(message);
228 }
229 }
230
231 return throttleOutputMsgs;
232 }
233 }