1 package org.apache.helix.healthcheck;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.Map;
24 import java.util.Set;
25
26 import org.apache.helix.HelixDataAccessor;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.NotificationContext;
29 import org.apache.helix.TestHelper;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.TestHelper.StartCMResult;
33 import org.apache.helix.alerts.AlertValueAndStatus;
34 import org.apache.helix.controller.HelixControllerMain;
35 import org.apache.helix.healthcheck.HealthStatsAggregationTask;
36 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
37 import org.apache.helix.integration.ZkIntegrationTestBase;
38 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
39 import org.apache.helix.manager.zk.ZNRecordSerializer;
40 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41 import org.apache.helix.manager.zk.ZkClient;
42 import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
43 import org.apache.helix.mock.participant.MockParticipant;
44 import org.apache.helix.mock.participant.MockTransition;
45 import org.apache.helix.model.Message;
46 import org.apache.helix.tools.ClusterSetup;
47 import org.apache.helix.tools.ClusterStateVerifier;
48 import org.testng.Assert;
49 import org.testng.annotations.AfterClass;
50 import org.testng.annotations.BeforeClass;
51 import org.testng.annotations.Test;
52
53
54 public class TestSimpleAlert extends ZkIntegrationTestBase
55 {
56 ZkClient _zkClient;
57 protected ClusterSetup _setupTool = null;
58 protected final String _alertStr = "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
59 protected final String _alertStatusStr = _alertStr;
60 protected final String _dbName = "TestDB0";
61
62 @BeforeClass ()
63 public void beforeClass() throws Exception
64 {
65 _zkClient = new ZkClient(ZK_ADDR);
66 _zkClient.setZkSerializer(new ZNRecordSerializer());
67
68 _setupTool = new ClusterSetup(ZK_ADDR);
69 }
70
71 @AfterClass
72 public void afterClass()
73 {
74 _zkClient.close();
75 }
76
77 public class SimpleAlertTransition extends MockTransition
78 {
79 int _alertValue;
80 public SimpleAlertTransition(int value)
81 {
82 _alertValue = value;
83 }
84 @Override
85 public void doTransition(Message message, NotificationContext context)
86 {
87 HelixManager manager = context.getManager();
88 HelixDataAccessor accessor = manager.getHelixDataAccessor();
89 String fromState = message.getFromState();
90 String toState = message.getToState();
91 String instance = message.getTgtName();
92 String partition = message.getPartitionName();
93
94 if (fromState.equalsIgnoreCase("SLAVE")
95 && toState.equalsIgnoreCase("MASTER"))
96 {
97
98
99
100 ParticipantHealthReportCollectorImpl reporter =
101 new ParticipantHealthReportCollectorImpl(manager, instance);
102 MockEspressoHealthReportProvider provider = new
103 MockEspressoHealthReportProvider();
104 reporter.addHealthReportProvider(provider);
105 String statName = "latency";
106 provider.setStat(_dbName, statName,""+(0.1+_alertValue));
107 reporter.transmitHealthReports();
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 }
128 }
129
130 }
131
132 @Test()
133 public void testSimpleAlert() throws Exception
134 {
135 String clusterName = getShortClassName();
136 MockParticipant[] participants = new MockParticipant[5];
137
138 System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
139
140 TestHelper.setupCluster(clusterName,
141 ZK_ADDR,
142 12918,
143 "localhost",
144 "TestDB",
145 1,
146 10,
147 5,
148 3,
149 "MasterSlave",
150 true);
151
152
153
154
155 StartCMResult cmResult = TestHelper.startController(clusterName,
156 "controller_0",
157 ZK_ADDR,
158 HelixControllerMain.STANDALONE);
159 cmResult._manager.startTimerTasks();
160 _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
161
162 for (int i = 0; i < 5; i++)
163 {
164 String instanceName = "localhost_" + (12918 + i);
165
166 participants[i] = new MockParticipant(clusterName,
167 instanceName,
168 ZK_ADDR,
169 new SimpleAlertTransition(15));
170 participants[i].syncStart();
171
172 }
173
174 boolean result = ClusterStateVerifier.verifyByPolling(
175 new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
176 Assert.assertTrue(result);
177
178
179
180 new HealthStatsAggregationTask(cmResult._manager).run();
181
182 Thread.sleep(3000);
183
184
185 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
186 Builder keyBuilder = accessor.keyBuilder();
187
188
189
190 String instance = "localhost_12918";
191 ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
192 Map<String, Map<String,String>> recMap = record.getMapFields();
193 Set<String> keySet = recMap.keySet();
194 Map<String,String> alertStatusMap = recMap.get(_alertStatusStr);
195 String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
196 boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
197 Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("15.1"));
198 Assert.assertTrue(fired);
199
200
201 ZNRecord alertHistory = accessor.getProperty(keyBuilder.alertHistory()).getRecord();
202
203 String deltakey = (String) (alertHistory.getMapFields().keySet().toArray()[0]);
204 Map<String, String> delta = alertHistory.getMapField(deltakey);
205 Assert.assertTrue(delta.size() == 1);
206 Assert.assertTrue(delta.get("EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName#TestDB0.latency))CMP(GREATER)CON(10)--(%)").equals("ON"));
207
208
209 System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
210 }
211 }