View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.TreeMap;
28  
29  import org.apache.helix.HelixDataAccessor;
30  import org.apache.helix.HelixManager;
31  import org.apache.helix.ZNRecord;
32  import org.apache.helix.ZkUnitTestBase;
33  import org.apache.helix.PropertyKey.Builder;
34  import org.apache.helix.controller.pipeline.Pipeline;
35  import org.apache.helix.controller.stages.AttributeName;
36  import org.apache.helix.controller.stages.ClusterEvent;
37  import org.apache.helix.controller.stages.MessageSelectionStageOutput;
38  import org.apache.helix.controller.stages.MessageThrottleStage;
39  import org.apache.helix.controller.stages.MessageThrottleStageOutput;
40  import org.apache.helix.controller.stages.ReadClusterDataStage;
41  import org.apache.helix.controller.stages.ResourceComputationStage;
42  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
43  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
44  import org.apache.helix.model.ClusterConstraints;
45  import org.apache.helix.model.ConstraintItem;
46  import org.apache.helix.model.Message;
47  import org.apache.helix.model.Partition;
48  import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
49  import org.apache.helix.model.ClusterConstraints.ConstraintType;
50  import org.apache.helix.model.Message.MessageType;
51  import org.apache.log4j.Logger;
52  import org.testng.Assert;
53  import org.testng.annotations.Test;
54  
55  
56  public class TestMessageThrottleStage extends ZkUnitTestBase
57  {
58    private static final Logger LOG        =
59                                               Logger.getLogger(TestMessageThrottleStage.class.getName());
60    final String                _className = getShortClassName();
61  
62    @Test
63    public void testMsgThrottleBasic() throws Exception
64    {
65      String clusterName = "CLUSTER_" + _className + "_basic";
66      System.out.println("START " + clusterName + " at "
67          + new Date(System.currentTimeMillis()));
68  
69      HelixDataAccessor accessor =
70          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
71      HelixManager manager = new DummyClusterManager(clusterName, accessor);
72  
73      // ideal state: node0 is MASTER, node1 is SLAVE
74      // replica=2 means 1 master and 1 slave
75      setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
76      setupLiveInstances(clusterName, new int[] { 0, 1 });
77      setupStateModel(clusterName);
78  
79      ClusterEvent event = new ClusterEvent("testEvent");
80      event.addAttribute("helixmanager", manager);
81  
82      MessageThrottleStage throttleStage = new MessageThrottleStage();
83      try
84      {
85        runStage(event, throttleStage);
86        Assert.fail("Should throw exception since DATA_CACHE is null");
87      }
88      catch (Exception e)
89      {
90        // OK
91      }
92  
93      Pipeline dataRefresh = new Pipeline();
94      dataRefresh.addStage(new ReadClusterDataStage());
95      runPipeline(event, dataRefresh);
96  
97      try
98      {
99        runStage(event, throttleStage);
100       Assert.fail("Should throw exception since RESOURCE is null");
101     }
102     catch (Exception e)
103     {
104       // OK
105     }
106     runStage(event, new ResourceComputationStage());
107 
108     try
109     {
110       runStage(event, throttleStage);
111       Assert.fail("Should throw exception since MESSAGE_SELECT is null");
112     }
113     catch (Exception e)
114     {
115       // OK
116     }
117     MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
118     List<Message> selectMessages = new ArrayList<Message>();
119     Message msg =
120         createMessage(MessageType.STATE_TRANSITION,
121                       "msgId-001",
122                       "OFFLINE",
123                       "SLAVE",
124                       "TestDB",
125                       "localhost_0");
126     selectMessages.add(msg);
127 
128     msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
129     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
130 
131     runStage(event, throttleStage);
132 
133     MessageThrottleStageOutput msgThrottleOutput =
134         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
135     Assert.assertEquals(msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"))
136                                          .size(),
137                         1);
138 
139     System.out.println("END " + clusterName + " at "
140         + new Date(System.currentTimeMillis()));
141 
142   }
143 
144   @Test()
145   public void testMsgThrottleConstraints() throws Exception
146   {
147     String clusterName = "CLUSTER_" + _className + "_constraints";
148     System.out.println("START " + clusterName + " at "
149         + new Date(System.currentTimeMillis()));
150 
151     HelixDataAccessor accessor =
152         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
153     HelixManager manager = new DummyClusterManager(clusterName, accessor);
154 
155     // ideal state: node0 is MASTER, node1 is SLAVE
156     // replica=2 means 1 master and 1 slave
157     setupIdealState(clusterName, new int[] { 0, 1 }, new String[] { "TestDB" }, 1, 2);
158     setupLiveInstances(clusterName, new int[] { 0, 1 });
159     setupStateModel(clusterName);
160 
161     // setup constraints
162     ZNRecord record = new ZNRecord(ConstraintType.MESSAGE_CONSTRAINT.toString());
163 
164     // constraint0:
165     // "MESSAGE_TYPE=STATE_TRANSITION,CONSTRAINT_VALUE=ANY"
166     record.setMapField("constraint0", new TreeMap<String, String>());
167     record.getMapField("constraint0").put("MESSAGE_TYPE", "STATE_TRANSITION");
168     record.getMapField("constraint0").put("CONSTRAINT_VALUE", "ANY");
169     ConstraintItem constraint0 = new ConstraintItem(record.getMapField("constraint0"));
170 
171     // constraint1:
172     // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,CONSTRAINT_VALUE=ANY"
173     record.setMapField("constraint1", new TreeMap<String, String>());
174     record.getMapField("constraint1").put("MESSAGE_TYPE", "STATE_TRANSITION");
175     record.getMapField("constraint1").put("TRANSITION", "OFFLINE-SLAVE");
176     record.getMapField("constraint1").put("CONSTRAINT_VALUE", "50");
177     ConstraintItem constraint1 = new ConstraintItem(record.getMapField("constraint1"));
178 
179     // constraint2:
180     // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=TestDB,CONSTRAINT_VALUE=2";
181     record.setMapField("constraint2", new TreeMap<String, String>());
182     record.getMapField("constraint2").put("MESSAGE_TYPE", "STATE_TRANSITION");
183     record.getMapField("constraint2").put("TRANSITION", "OFFLINE-SLAVE");
184     record.getMapField("constraint2").put("INSTANCE", ".*");
185     record.getMapField("constraint2").put("RESOURCE", "TestDB");
186     record.getMapField("constraint2").put("CONSTRAINT_VALUE", "2");
187     ConstraintItem constraint2 = new ConstraintItem(record.getMapField("constraint2"));
188 
189     // constraint3:
190     // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=.*,CONSTRAINT_VALUE=1";
191     record.setMapField("constraint3", new TreeMap<String, String>());
192     record.getMapField("constraint3").put("MESSAGE_TYPE", "STATE_TRANSITION");
193     record.getMapField("constraint3").put("TRANSITION", "OFFLINE-SLAVE");
194     record.getMapField("constraint3").put("INSTANCE", "localhost_1");
195     record.getMapField("constraint3").put("RESOURCE", ".*");
196     record.getMapField("constraint3").put("CONSTRAINT_VALUE", "1");
197     ConstraintItem constraint3 = new ConstraintItem(record.getMapField("constraint3"));
198 
199     // constraint4:
200     // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,RESOURCE=.*,CONSTRAINT_VALUE=10"
201     record.setMapField("constraint4", new TreeMap<String, String>());
202     record.getMapField("constraint4").put("MESSAGE_TYPE", "STATE_TRANSITION");
203     record.getMapField("constraint4").put("TRANSITION", "OFFLINE-SLAVE");
204     record.getMapField("constraint4").put("INSTANCE", ".*");
205     record.getMapField("constraint4").put("RESOURCE", ".*");
206     record.getMapField("constraint4").put("CONSTRAINT_VALUE", "10");
207     ConstraintItem constraint4 = new ConstraintItem(record.getMapField("constraint4"));
208 
209     // constraint5:
210     // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=localhost_12918,RESOURCE=TestDB,CONSTRAINT_VALUE=5"
211     record.setMapField("constraint5", new TreeMap<String, String>());
212     record.getMapField("constraint5").put("MESSAGE_TYPE", "STATE_TRANSITION");
213     record.getMapField("constraint5").put("TRANSITION", "OFFLINE-SLAVE");
214     record.getMapField("constraint5").put("INSTANCE", "localhost_0");
215     record.getMapField("constraint5").put("RESOURCE", "TestDB");
216     record.getMapField("constraint5").put("CONSTRAINT_VALUE", "3");
217     ConstraintItem constraint5 = new ConstraintItem(record.getMapField("constraint5"));
218 
219     Builder keyBuilder = accessor.keyBuilder();
220     accessor.setProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()),
221                          new ClusterConstraints(record));
222 
223     // ClusterConstraints constraint =
224     // accessor.getProperty(ClusterConstraints.class,
225     // PropertyType.CONFIGS,
226     // ConfigScopeProperty.CONSTRAINT.toString(),
227     // ConstraintType.MESSAGE_CONSTRAINT.toString());
228     ClusterConstraints constraint =
229         accessor.getProperty(keyBuilder.constraint(ConstraintType.MESSAGE_CONSTRAINT.toString()));
230 
231     MessageThrottleStage throttleStage = new MessageThrottleStage();
232 
233     // test constraintSelection
234     // message1: hit contraintSelection rule1 and rule2
235     Message msg1 =
236         createMessage(MessageType.STATE_TRANSITION,
237                       "msgId-001",
238                       "OFFLINE",
239                       "SLAVE",
240                       "TestDB",
241                       "localhost_0");
242 
243     Map<ConstraintAttribute, String> msgAttr =
244         ClusterConstraints.toConstraintAttributes(msg1);
245     Set<ConstraintItem> matches = constraint.match(msgAttr);
246     System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
247     Assert.assertEquals(matches.size(), 5);
248     Assert.assertTrue(containsConstraint(matches, constraint0));
249     Assert.assertTrue(containsConstraint(matches, constraint1));
250     Assert.assertTrue(containsConstraint(matches, constraint2));
251     Assert.assertTrue(containsConstraint(matches, constraint4));
252     Assert.assertTrue(containsConstraint(matches, constraint5));
253 
254     matches = throttleStage.selectConstraints(matches, msgAttr);
255     System.out.println(msg1 + " matches(" + matches.size() + "): " + matches);
256     Assert.assertEquals(matches.size(), 2);
257     Assert.assertTrue(containsConstraint(matches, constraint1));
258     Assert.assertTrue(containsConstraint(matches, constraint5));
259 
260     // message2: hit contraintSelection rule1, rule2, and rule3
261     Message msg2 =
262         createMessage(MessageType.STATE_TRANSITION,
263                       "msgId-002",
264                       "OFFLINE",
265                       "SLAVE",
266                       "TestDB",
267                       "localhost_1");
268 
269     msgAttr = ClusterConstraints.toConstraintAttributes(msg2);
270     matches = constraint.match(msgAttr);
271     System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
272     Assert.assertEquals(matches.size(), 5);
273     Assert.assertTrue(containsConstraint(matches, constraint0));
274     Assert.assertTrue(containsConstraint(matches, constraint1));
275     Assert.assertTrue(containsConstraint(matches, constraint2));
276     Assert.assertTrue(containsConstraint(matches, constraint3));
277     Assert.assertTrue(containsConstraint(matches, constraint4));
278 
279     matches = throttleStage.selectConstraints(matches, msgAttr);
280     System.out.println(msg2 + " matches(" + matches.size() + "): " + matches);
281     Assert.assertEquals(matches.size(), 2);
282     Assert.assertTrue(containsConstraint(matches, constraint1));
283     Assert.assertTrue(containsConstraint(matches, constraint3));
284 
285     // test messageThrottleStage
286     ClusterEvent event = new ClusterEvent("testEvent");
287     event.addAttribute("helixmanager", manager);
288 
289     Pipeline dataRefresh = new Pipeline();
290     dataRefresh.addStage(new ReadClusterDataStage());
291     runPipeline(event, dataRefresh);
292     runStage(event, new ResourceComputationStage());
293     MessageSelectionStageOutput msgSelectOutput = new MessageSelectionStageOutput();
294 
295     Message msg3 =
296         createMessage(MessageType.STATE_TRANSITION,
297                       "msgId-003",
298                       "OFFLINE",
299                       "SLAVE",
300                       "TestDB",
301                       "localhost_0");
302 
303     Message msg4 =
304         createMessage(MessageType.STATE_TRANSITION,
305                       "msgId-004",
306                       "OFFLINE",
307                       "SLAVE",
308                       "TestDB",
309                       "localhost_0");
310 
311     Message msg5 =
312         createMessage(MessageType.STATE_TRANSITION,
313                       "msgId-005",
314                       "OFFLINE",
315                       "SLAVE",
316                       "TestDB",
317                       "localhost_0");
318 
319     Message msg6 =
320         createMessage(MessageType.STATE_TRANSITION,
321                       "msgId-006",
322                       "OFFLINE",
323                       "SLAVE",
324                       "TestDB",
325                       "localhost_1");
326 
327     List<Message> selectMessages = new ArrayList<Message>();
328     selectMessages.add(msg1);
329     selectMessages.add(msg2);
330     selectMessages.add(msg3);
331     selectMessages.add(msg4);
332     selectMessages.add(msg5); // should be throttled
333     selectMessages.add(msg6); // should be throttled
334 
335     msgSelectOutput.addMessages("TestDB", new Partition("TestDB_0"), selectMessages);
336     event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), msgSelectOutput);
337 
338     runStage(event, throttleStage);
339 
340     MessageThrottleStageOutput msgThrottleOutput =
341         event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
342     List<Message> throttleMessages =
343         msgThrottleOutput.getMessages("TestDB", new Partition("TestDB_0"));
344     Assert.assertEquals(throttleMessages.size(), 4);
345     Assert.assertTrue(throttleMessages.contains(msg1));
346     Assert.assertTrue(throttleMessages.contains(msg2));
347     Assert.assertTrue(throttleMessages.contains(msg3));
348     Assert.assertTrue(throttleMessages.contains(msg4));
349 
350     System.out.println("END " + clusterName + " at "
351         + new Date(System.currentTimeMillis()));
352 
353   }
354 
355   private boolean containsConstraint(Set<ConstraintItem> constraints,
356                                      ConstraintItem constraint)
357   {
358     for (ConstraintItem item : constraints)
359     {
360       if (item.toString().equals(constraint.toString()))
361       {
362         return true;
363       }
364     }
365     return false;
366   }
367 
368   // add pending message test case
369 }