1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.TreeMap;
26 import java.util.concurrent.atomic.AtomicBoolean;
27
28 import org.I0Itec.zkclient.IZkChildListener;
29 import org.apache.helix.HelixAdmin;
30 import org.apache.helix.PropertyPathConfig;
31 import org.apache.helix.PropertyType;
32 import org.apache.helix.TestHelper;
33 import org.apache.helix.ZNRecord;
34 import org.apache.helix.PropertyKey.Builder;
35 import org.apache.helix.controller.HelixControllerMain;
36 import org.apache.helix.manager.zk.ZKHelixAdmin;
37 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
38 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39 import org.apache.helix.mock.participant.MockParticipant;
40 import org.apache.helix.model.ClusterConstraints;
41 import org.apache.helix.model.ConstraintItem;
42 import org.apache.helix.model.Message;
43 import org.apache.helix.model.ClusterConstraints.ConstraintType;
44 import org.apache.helix.model.builder.ClusterConstraintsBuilder;
45 import org.apache.helix.model.builder.ConstraintItemBuilder;
46 import org.apache.helix.tools.ClusterStateVerifier;
47 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
48 import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
49 import org.testng.Assert;
50 import org.testng.annotations.Test;
51
52
53 public class TestMessageThrottle extends ZkIntegrationTestBase
54 {
55 @Test()
56 public void testMessageThrottle() throws Exception
57 {
58
59
60 String clusterName = getShortClassName();
61 MockParticipant[] participants = new MockParticipant[5];
62
63
64 System.out.println("START " + clusterName + " at "
65 + new Date(System.currentTimeMillis()));
66
67 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
68
69 "localhost",
70 "TestDB",
71 1,
72 10,
73 5,
74 3,
75 "MasterSlave",
76 true);
77
78
79
80 HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
81 ConstraintItemBuilder builder = new ConstraintItemBuilder();
82 builder.addConstraintAttribute("MESSAGE_TYPE", "STATE_TRANSITION")
83 .addConstraintAttribute("INSTANCE", ".*")
84 .addConstraintAttribute("CONSTRAINT_VALUE", "1");
85
86
87
88
89
90
91 admin.setConstraint(clusterName, ConstraintType.MESSAGE_CONSTRAINT, "constraint1", builder.build());
92
93
94 final ZKHelixDataAccessor accessor =
95 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
96
97
98 final AtomicBoolean success = new AtomicBoolean(true);
99 for (int i = 0; i < 5; i++)
100 {
101 String instanceName = "localhost_" + (12918 + i);
102 String msgPath =
103 PropertyPathConfig.getPath(PropertyType.MESSAGES, clusterName, instanceName);
104
105 _gZkClient.subscribeChildChanges(msgPath, new IZkChildListener()
106 {
107
108 @Override
109 public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
110 {
111 if (currentChilds != null && currentChilds.size() > 1)
112 {
113 List<ZNRecord> records = accessor.getBaseDataAccessor().getChildren(parentPath, null, 0);
114 int transitionMsgCount = 0;
115 for (ZNRecord record : records)
116 {
117 Message msg = new Message(record);
118 if(msg.getMsgType().equals(Message.MessageType.STATE_TRANSITION.toString()))
119 {
120 transitionMsgCount++;
121 }
122 }
123
124 if (transitionMsgCount > 1)
125 {
126 success.set(false);
127 Assert.fail("Should not see more than 1 message");
128 }
129 }
130
131
132 }
133 });
134 }
135
136 TestHelper.startController(clusterName,
137 "controller_0",
138 ZK_ADDR,
139 HelixControllerMain.STANDALONE);
140
141 for (int i = 0; i < 5; i++)
142 {
143 String instanceName = "localhost_" + (12918 + i);
144
145 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR);
146 participants[i].syncStart();
147 }
148
149 boolean result =
150 ClusterStateVerifier.verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR,
151 clusterName));
152 Assert.assertTrue(result);
153
154 result =
155 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
156 clusterName));
157 Assert.assertTrue(result);
158
159 Assert.assertTrue(success.get());
160
161
162
163 for (int i = 0; i < 5; i++)
164 {
165 participants[i].syncStop();
166 }
167
168 System.out.println("END " + clusterName + " at "
169 + new Date(System.currentTimeMillis()));
170 }
171 }