1 package org.apache.helix.healthcheck;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.Map;
24 import java.util.Set;
25
26 import org.apache.helix.HelixDataAccessor;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.NotificationContext;
29 import org.apache.helix.TestHelper;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.TestHelper.StartCMResult;
33 import org.apache.helix.controller.HelixControllerMain;
34 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
35 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
36 import org.apache.helix.integration.ZkIntegrationTestBase;
37 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
38 import org.apache.helix.manager.zk.ZNRecordSerializer;
39 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
40 import org.apache.helix.manager.zk.ZkClient;
41 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
42 import org.apache.helix.mock.participant.MockParticipant;
43 import org.apache.helix.mock.participant.MockTransition;
44 import org.apache.helix.model.Message;
45 import org.apache.helix.tools.ClusterSetup;
46 import org.apache.helix.tools.ClusterStateVerifier;
47 import org.testng.Assert;
48 import org.testng.annotations.AfterClass;
49 import org.testng.annotations.BeforeClass;
50 import org.testng.annotations.Test;
51
52
53 public class TestAddDropAlert extends ZkIntegrationTestBase
54 {
55 ZkClient _zkClient;
56 protected ClusterSetup _setupTool = null;
57 protected final String _alertStr =
58 "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
59 protected final String _alertStatusStr = _alertStr;
60 protected final String _dbName = "TestDB0";
61
62 @BeforeClass()
63 public void beforeClass() throws Exception
64 {
65 _zkClient = new ZkClient(ZK_ADDR);
66 _zkClient.setZkSerializer(new ZNRecordSerializer());
67
68 _setupTool = new ClusterSetup(ZK_ADDR);
69 }
70
71 @AfterClass
72 public void afterClass()
73 {
74 _zkClient.close();
75 }
76
77 public class AddDropAlertTransition extends MockTransition
78 {
79 @Override
80 public void doTransition(Message message, NotificationContext context)
81 {
82 HelixManager manager = context.getManager();
83 HelixDataAccessor accessor = manager.getHelixDataAccessor();
84 String fromState = message.getFromState();
85 String toState = message.getToState();
86 String instance = message.getTgtName();
87 String partition = message.getPartitionName();
88
89 if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
90 {
91
92
93
94 ParticipantHealthReportCollectorImpl reporter =
95 new ParticipantHealthReportCollectorImpl(manager, instance);
96 MockEspressoHealthReportProvider provider =
97 new MockEspressoHealthReportProvider();
98 reporter.addHealthReportProvider(provider);
99 String statName = "latency";
100 provider.setStat(_dbName, statName, "15");
101 reporter.transmitHealthReports();
102
103
104
105 try
106 {
107 Thread.sleep(10000);
108 }
109 catch (InterruptedException e)
110 {
111 System.err.println("Error sleeping");
112 }
113 provider.setStat(_dbName, statName, "1");
114 reporter.transmitHealthReports();
115
116
117
118
119
120
121
122 }
123 }
124 }
125
126 @Test()
127 public void testAddDropAlert() throws Exception
128 {
129 String clusterName = getShortClassName();
130 MockParticipant[] participants = new MockParticipant[5];
131
132 System.out.println("START TestAddDropAlert at "
133 + new Date(System.currentTimeMillis()));
134
135 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
136 "localhost",
137 "TestDB",
138 1,
139 10,
140 5,
141 1,
142 "MasterSlave",
143 true);
144
145
146 _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
147
148 StartCMResult cmResult =
149 TestHelper.startController(clusterName,
150 "controller_0",
151 ZK_ADDR,
152 HelixControllerMain.STANDALONE);
153
154 for (int i = 0; i < 5; i++)
155 {
156 String instanceName = "localhost_" + (12918 + i);
157
158 participants[i] =
159 new MockParticipant(clusterName,
160 instanceName,
161 ZK_ADDR,
162 new AddDropAlertTransition());
163 participants[i].syncStart();
164
165 }
166
167 boolean result =
168 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
169 clusterName));
170 Assert.assertTrue(result);
171
172
173
174 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
175 Builder keyBuilder = accessor.keyBuilder();
176
177 new HealthStatsAggregationTask(cmResult._manager).run();
178 String instance = "localhost_12918";
179 ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
180 Map<String, Map<String, String>> recMap = record.getMapFields();
181 Set<String> keySet = recMap.keySet();
182 Assert.assertTrue(keySet.size() > 0);
183
184 _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
185 new HealthStatsAggregationTask(cmResult._manager).run();
186
187
188
189
190 record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
191 recMap = record.getMapFields();
192 keySet = recMap.keySet();
193 Assert.assertEquals(keySet.size(), 0);
194
195
196 System.out.println("END TestAddDropAlert at " + new Date(System.currentTimeMillis()));
197 }
198 }