View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.TreeMap;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  
28  import org.I0Itec.zkclient.IZkChildListener;
29  import org.apache.helix.HelixAdmin;
30  import org.apache.helix.PropertyPathConfig;
31  import org.apache.helix.PropertyType;
32  import org.apache.helix.TestHelper;
33  import org.apache.helix.ZNRecord;
34  import org.apache.helix.PropertyKey.Builder;
35  import org.apache.helix.controller.HelixControllerMain;
36  import org.apache.helix.manager.zk.ZKHelixAdmin;
37  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
38  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39  import org.apache.helix.mock.participant.MockParticipant;
40  import org.apache.helix.model.ClusterConstraints;
41  import org.apache.helix.model.ConstraintItem;
42  import org.apache.helix.model.Message;
43  import org.apache.helix.model.ClusterConstraints.ConstraintType;
44  import org.apache.helix.model.builder.ClusterConstraintsBuilder;
45  import org.apache.helix.model.builder.ConstraintItemBuilder;
46  import org.apache.helix.tools.ClusterStateVerifier;
47  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
48  import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
49  import org.testng.Assert;
50  import org.testng.annotations.Test;
51  
52  
53  public class TestMessageThrottle extends ZkIntegrationTestBase
54  {
55    @Test()
56    public void testMessageThrottle() throws Exception
57    {
58      // Logger.getRootLogger().setLevel(Level.INFO);
59  
60      String clusterName = getShortClassName();
61      MockParticipant[] participants = new MockParticipant[5];
62      // ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
63  
64      System.out.println("START " + clusterName + " at "
65          + new Date(System.currentTimeMillis()));
66  
67      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
68                                                           // port
69                              "localhost", // participant name prefix
70                              "TestDB", // resource name prefix
71                              1, // resources
72                              10, // partitions per resource
73                              5, // number of nodes
74                              3, // replicas
75                              "MasterSlave",
76                              true); // do rebalance
77  
78      // setup message constraint
79      // "MESSAGE_TYPE=STATE_TRANSITION,TRANSITION=OFFLINE-SLAVE,INSTANCE=.*,CONSTRAINT_VALUE=1";
80      HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
81      ConstraintItemBuilder builder = new ConstraintItemBuilder();
82      builder.addConstraintAttribute("MESSAGE_TYPE", "STATE_TRANSITION")
83             .addConstraintAttribute("INSTANCE", ".*")
84             .addConstraintAttribute("CONSTRAINT_VALUE", "1");
85      
86  //    Map<String, String> constraints = new TreeMap<String, String>();
87  //    constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
88  //    // constraints.put("TRANSITION", "OFFLINE-SLAVE");
89  //    constraints.put("CONSTRAINT_VALUE", "1");
90  //    constraints.put("INSTANCE", ".*");
91      admin.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1", builder.build());
92      
93  
94      final ZKHelixDataAccessor accessor =
95          new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
96  
97      // make sure we never see more than 1 state transition message for each participant
98      final AtomicBoolean success = new AtomicBoolean(true);
99      for (int i = 0; i < 5; i++)
100     {
101       String instanceName = "localhost_" + (12918 + i);
102       String msgPath =
103           PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName);
104 
105       _gZkClient.subscribeChildChanges(msgPath, new IZkChildListener()
106       {
107 
108         @Override
109         public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
110         {
111           if (currentChilds != null && currentChilds.size() > 1)
112           {
113             List<ZNRecord> records = accessor.getBaseDataAccessor().getChildren(parentPath, null, 0);
114             int transitionMsgCount = 0;
115             for (ZNRecord record : records)
116             {
117               Message msg = new Message(record);
118               if(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
119               {
120                 transitionMsgCount++;
121               }
122             }
123 
124             if (transitionMsgCount > 1)
125             {
126               success.set(false);
127               Assert.fail("Should not see more than 1 message");
128             }
129           }
130           
131           
132         }
133       });
134     }
135 
136     TestHelper.startController(clusterName,
137                                "controller_0",
138                                ZK_ADDR,
139                                HelixControllerMain.STANDALONE);
140     // start participants
141     for (int i = 0; i < 5; i++)
142     {
143       String instanceName = "localhost_" + (12918 + i);
144 
145       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
146       participants[i].syncStart();
147     }
148 
149     boolean result =
150         ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
151                                                                               clusterName));
152     Assert.assertTrue(result);
153 
154     result =
155         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
156                                                                                  clusterName));
157     Assert.assertTrue(result);
158 
159     Assert.assertTrue(success.get());
160     
161 
162     // clean up
163     for (int i = 0; i < 5; i++)
164     {
165       participants[i].syncStop();
166     }
167 
168     System.out.println("END " + clusterName + " at "
169         + new Date(System.currentTimeMillis()));
170   }
171 }