1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.TreeMap;
28
29 import org.apache.helix.HelixDataAccessor;
30 import org.apache.helix.HelixManager;
31 import org.apache.helix.ZNRecord;
32 import org.apache.helix.ZkUnitTestBase;
33 import org.apache.helix.PropertyKey.Builder;
34 import org.apache.helix.controller.pipeline.Pipeline;
35 import org.apache.helix.controller.stages.AttributeName;
36 import org.apache.helix.controller.stages.ClusterEvent;
37 import org.apache.helix.controller.stages.MessageSelectionStageOutput;
38 import org.apache.helix.controller.stages.MessageThrottleStage;
39 import org.apache.helix.controller.stages.MessageThrottleStageOutput;
40 import org.apache.helix.controller.stages.ReadClusterDataStage;
41 import org.apache.helix.controller.stages.ResourceComputationStage;
42 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
43 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
44 import org.apache.helix.model.ClusterConstraints;
45 import org.apache.helix.model.ConstraintItem;
46 import org.apache.helix.model.Message;
47 import org.apache.helix.model.Partition;
48 import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
49 import org.apache.helix.model.ClusterConstraints.ConstraintType;
50 import org.apache.helix.model.Message.MessageType;
51 import org.apache.log4j.Logger;
52 import org.testng.Assert;
53 import org.testng.annotations.Test;
54
55
56 public class TestMessageThrottleStage extends ZkUnitTestBase
57 {
58 private static final Logger LOG =
59 Logger.getLogger(TestMessageThrottleStage.class.getName());
60 final String _className = getShortClassName();
61
62 @Test
63 public void testMsgThrottleBasic() throws Exception
64 {
65 String clusterName = "CLUSTER_" + _className + "_basic";
66 System.out.println("START " + clusterName + " at "
67 + new Date(System.currentTimeMillis()));
68
69 HelixDataAccessor accessor =
70 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
71 HelixManager manager = new DummyClusterManager(clusterName, accessor);
72
73
74
75 setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
76 setupLiveInstances(clusterName, new int[] { 0, 1 });
77 setupStateModel(clusterName);
78
79 ClusterEvent event = new ClusterEvent("testEvent");
80 event.addAttribute("helixmanager", manager);
81
82 MessageThrottleStage throttleStage = new MessageThrottleStage();
83 try
84 {
85 runStage(event, throttleStage);
86 Assert.fail("Should throw exception since DATA_CACHE is null");
87 }
88 catch (Exception e)
89 {
90
91 }
92
93 Pipeline dataRefresh = new Pipeline();
94 dataRefresh.addStage(new ReadClusterDataStage());
95 runPipeline(event, dataRefresh);
96
97 try
98 {
99 runStage(event, throttleStage);
100 Assert.fail("Should throw exception since RESOURCE is null");
101 }
102 catch (Exception e)
103 {
104
105 }
106 runStage(event, new ResourceComputationStage());
107
108 try
109 {
110 runStage(event, throttleStage);
111 Assert.fail("Should throw exception since MESSAGE_SELECT is null");
112 }
113 catch (Exception e)
114 {
115
116 }
117 MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
118 List<Message> selectMessages = new ArrayList<Message>();
119 Message msg =
120 createMessage(MessageType.STATE_TRANSITION,
121 "msgId-001",
122 "OFFLINE",
123 "SLAVE",
124 "TestDB",
125 "localhost_0");
126 selectMessages.add(msg);
127
128 msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
129 event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
130
131 runStage(event, throttleStage);
132
133 MessageThrottleStageOutput msgThrottleOutput =
134 event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
135 Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"))
136 .size(),
137 1);
138
139 System.out.println("END " + clusterName + " at "
140 + new Date(System.currentTimeMillis()));
141
142 }
143
144 @Test()
145 public void testMsgThrottleConstraints() throws Exception
146 {
147 String clusterName = "CLUSTER_" + _className + "_constraints";
148 System.out.println("START " + clusterName + " at "
149 + new Date(System.currentTimeMillis()));
150
151 HelixDataAccessor accessor =
152 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
153 HelixManager manager = new DummyClusterManager(clusterName, accessor);
154
155
156
157 setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
158 setupLiveInstances(clusterName, new int[] { 0, 1 });
159 setupStateModel(clusterName);
160
161
162 ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
163
164
165
166 record.setMapField("constraint0", new TreeMap<String, String>());
167 record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
168 record.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
169 ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
170
171
172
173 record.setMapField("constraint1", new TreeMap<String, String>());
174 record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
175 record.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
176 record.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
177 ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
178
179
180
181 record.setMapField("constraint2", new TreeMap<String, String>());
182 record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
183 record.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
184 record.getMapField("constraint2").put("INSTANCE", ".*");
185 record.getMapField("constraint2").put("RESOURCE", "TestDB");
186 record.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
187 ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
188
189
190
191 record.setMapField("constraint3", new TreeMap<String, String>());
192 record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
193 record.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
194 record.getMapField("constraint3").put("INSTANCE", "localhost_1");
195 record.getMapField("constraint3").put("RESOURCE", ".*");
196 record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
197 ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint3"));
198
199
200
201 record.setMapField("constraint4", new TreeMap<String, String>());
202 record.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
203 record.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
204 record.getMapField("constraint4").put("INSTANCE", ".*");
205 record.getMapField("constraint4").put("RESOURCE", ".*");
206 record.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
207 ConstraintItem constraint4 = new ConstraintItem(record.getMapField("constraint4"));
208
209
210
211 record.setMapField("constraint5", new TreeMap<String, String>());
212 record.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
213 record.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
214 record.getMapField("constraint5").put("INSTANCE", "localhost_0");
215 record.getMapField("constraint5").put("RESOURCE", "TestDB");
216 record.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
217 ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
218
219 Builder keyBuilder = accessor.keyBuilder();
220 accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
221 new ClusterConstraints(record));
222
223
224
225
226
227
228 ClusterConstraints constraint =
229 accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
230
231 MessageThrottleStage throttleStage = new MessageThrottleStage();
232
233
234
235 Message msg1 =
236 createMessage(MessageType.STATE_TRANSITION,
237 "msgId-001",
238 "OFFLINE",
239 "SLAVE",
240 "TestDB",
241 "localhost_0");
242
243 Map<ConstraintAttribute, String> msgAttr =
244 ClusterConstraints.toConstraintAttributes(msg1);
245 Set<ConstraintItem> matches = constraint.match(msgAttr);
246 System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
247 Assert.assertEquals(matches.size(), 5);
248 Assert.assertTrue(containsConstraint(matches, constraint0));
249 Assert.assertTrue(containsConstraint(matches, constraint1));
250 Assert.assertTrue(containsConstraint(matches, constraint2));
251 Assert.assertTrue(containsConstraint(matches, constraint4));
252 Assert.assertTrue(containsConstraint(matches, constraint5));
253
254 matches = throttleStage.selectConstraints(matches, msgAttr);
255 System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
256 Assert.assertEquals(matches.size(), 2);
257 Assert.assertTrue(containsConstraint(matches, constraint1));
258 Assert.assertTrue(containsConstraint(matches, constraint5));
259
260
261 Message msg2 =
262 createMessage(MessageType.STATE_TRANSITION,
263 "msgId-002",
264 "OFFLINE",
265 "SLAVE",
266 "TestDB",
267 "localhost_1");
268
269 msgAttr = ClusterConstraints.toConstraintAttributes(msg2);
270 matches = constraint.match(msgAttr);
271 System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
272 Assert.assertEquals(matches.size(), 5);
273 Assert.assertTrue(containsConstraint(matches, constraint0));
274 Assert.assertTrue(containsConstraint(matches, constraint1));
275 Assert.assertTrue(containsConstraint(matches, constraint2));
276 Assert.assertTrue(containsConstraint(matches, constraint3));
277 Assert.assertTrue(containsConstraint(matches, constraint4));
278
279 matches = throttleStage.selectConstraints(matches, msgAttr);
280 System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
281 Assert.assertEquals(matches.size(), 2);
282 Assert.assertTrue(containsConstraint(matches, constraint1));
283 Assert.assertTrue(containsConstraint(matches, constraint3));
284
285
286 ClusterEvent event = new ClusterEvent("testEvent");
287 event.addAttribute("helixmanager", manager);
288
289 Pipeline dataRefresh = new Pipeline();
290 dataRefresh.addStage(new ReadClusterDataStage());
291 runPipeline(event, dataRefresh);
292 runStage(event, new ResourceComputationStage());
293 MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
294
295 Message msg3 =
296 createMessage(MessageType.STATE_TRANSITION,
297 "msgId-003",
298 "OFFLINE",
299 "SLAVE",
300 "TestDB",
301 "localhost_0");
302
303 Message msg4 =
304 createMessage(MessageType.STATE_TRANSITION,
305 "msgId-004",
306 "OFFLINE",
307 "SLAVE",
308 "TestDB",
309 "localhost_0");
310
311 Message msg5 =
312 createMessage(MessageType.STATE_TRANSITION,
313 "msgId-005",
314 "OFFLINE",
315 "SLAVE",
316 "TestDB",
317 "localhost_0");
318
319 Message msg6 =
320 createMessage(MessageType.STATE_TRANSITION,
321 "msgId-006",
322 "OFFLINE",
323 "SLAVE",
324 "TestDB",
325 "localhost_1");
326
327 List<Message> selectMessages = new ArrayList<Message>();
328 selectMessages.add(msg1);
329 selectMessages.add(msg2);
330 selectMessages.add(msg3);
331 selectMessages.add(msg4);
332 selectMessages.add(msg5);
333 selectMessages.add(msg6);
334
335 msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
336 event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
337
338 runStage(event, throttleStage);
339
340 MessageThrottleStageOutput msgThrottleOutput =
341 event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
342 List<Message> throttleMessages =
343 msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
344 Assert.assertEquals(throttleMessages.size(), 4);
345 Assert.assertTrue(throttleMessages.contains(msg1));
346 Assert.assertTrue(throttleMessages.contains(msg2));
347 Assert.assertTrue(throttleMessages.contains(msg3));
348 Assert.assertTrue(throttleMessages.contains(msg4));
349
350 System.out.println("END " + clusterName + " at "
351 + new Date(System.currentTimeMillis()));
352
353 }
354
355 private boolean containsConstraint(Set<ConstraintItem> constraints,
356 ConstraintItem constraint)
357 {
358 for (ConstraintItem item : constraints)
359 {
360 if (item.toString().equals(constraint.toString()))
361 {
362 return true;
363 }
364 }
365 return false;
366 }
367
368
369 }