View Javadoc

1   package org.apache.helix.healthcheck;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.util.Date;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.concurrent.ConcurrentHashMap;
28  
29  import javax.management.AttributeNotFoundException;
30  import javax.management.InstanceNotFoundException;
31  import javax.management.IntrospectionException;
32  import javax.management.MBeanAttributeInfo;
33  import javax.management.MBeanException;
34  import javax.management.MBeanInfo;
35  import javax.management.MBeanServerConnection;
36  import javax.management.MBeanServerNotification;
37  import javax.management.MalformedObjectNameException;
38  import javax.management.ObjectName;
39  import javax.management.ReflectionException;
40  
41  import org.apache.helix.HelixDataAccessor;
42  import org.apache.helix.HelixManager;
43  import org.apache.helix.NotificationContext;
44  import org.apache.helix.TestHelper;
45  import org.apache.helix.ZNRecord;
46  import org.apache.helix.PropertyKey.Builder;
47  import org.apache.helix.TestHelper.StartCMResult;
48  import org.apache.helix.alerts.AlertValueAndStatus;
49  import org.apache.helix.controller.HelixControllerMain;
50  import org.apache.helix.healthcheck.HealthStatsAggregationTask;
51  import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
52  import org.apache.helix.integration.ZkIntegrationTestBase;
53  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
54  import org.apache.helix.manager.zk.ZNRecordSerializer;
55  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
56  import org.apache.helix.manager.zk.ZkClient;
57  import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
58  import org.apache.helix.mock.participant.MockParticipant;
59  import org.apache.helix.mock.participant.MockTransition;
60  import org.apache.helix.model.Message;
61  import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
62  import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
63  import org.apache.helix.tools.ClusterSetup;
64  import org.apache.helix.tools.ClusterStateVerifier;
65  import org.apache.log4j.Logger;
66  import org.testng.Assert;
67  import org.testng.annotations.AfterClass;
68  import org.testng.annotations.BeforeClass;
69  import org.testng.annotations.Test;
70  
71  
72  public class TestWildcardAlert extends ZkIntegrationTestBase
73  {
74    public static class TestClusterMBeanObserver extends ClusterMBeanObserver
75    {
76      public Map<String, Map<String, Object>> _beanValueMap = new ConcurrentHashMap<String, Map<String, Object>>();
77  
78      public TestClusterMBeanObserver(String domain) throws InstanceNotFoundException, IOException,
79          MalformedObjectNameException, NullPointerException
80      {
81        super(domain);
82      }
83  
84      @Override
85      public void onMBeanRegistered(MBeanServerConnection server,
86          MBeanServerNotification mbsNotification)
87      {
88        try
89        {
90          MBeanInfo info = _server.getMBeanInfo(mbsNotification.getMBeanName());
91          MBeanAttributeInfo[] infos = info.getAttributes();
92          _beanValueMap.put(mbsNotification.getMBeanName().toString(), new ConcurrentHashMap<String, Object>());
93          for (MBeanAttributeInfo infoItem : infos)
94          {
95            Object val = _server.getAttribute(mbsNotification.getMBeanName(), infoItem.getName());
96            System.out.println("         " + infoItem.getName() + " : "
97                + _server.getAttribute(mbsNotification.getMBeanName(), infoItem.getName())
98                + " type : " + infoItem.getType());
99            _beanValueMap.get(mbsNotification.getMBeanName().toString()).put(infoItem.getName(), val);
100         }
101       } catch (Exception e)
102       {
103         _logger.error("Error getting bean info, domain=" + _domain, e);
104       }
105     }
106 
107     @Override
108     public void onMBeanUnRegistered(MBeanServerConnection server,
109         MBeanServerNotification mbsNotification)
110     {
111       _beanValueMap.remove(mbsNotification.getMBeanName().toString());
112     }
113 
114     public void refresh() throws MalformedObjectNameException, NullPointerException, InstanceNotFoundException, IntrospectionException, ReflectionException, IOException, AttributeNotFoundException, MBeanException
115     {
116       for(String beanName: _beanValueMap.keySet())
117       {
118         ObjectName objName = new ObjectName(beanName);
119         MBeanInfo info = _server.getMBeanInfo(objName);
120         MBeanAttributeInfo[] infos = info.getAttributes();
121         _beanValueMap.put(objName.toString(), new HashMap<String, Object>());
122         for(MBeanAttributeInfo infoItem : infos)
123         {
124           Object val = _server.getAttribute(objName, infoItem.getName());
125           System.out.println("         " + infoItem.getName() + " : " + _server.getAttribute(objName, infoItem.getName()) + " type : " + infoItem.getType());
126           _beanValueMap.get(objName.toString()).put(infoItem.getName(), val);
127         }
128       }
129     }
130 
131   }
132 
133   private static final Logger _logger = Logger.getLogger(TestWildcardAlert.class);
134   ZkClient _zkClient;
135   protected ClusterSetup _setupTool = null;
136   protected final String _alertStr = "EXP(decay(1)(localhost_*.RestQueryStats@DBName=TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)";
137   protected final String _alertStatusStr = _alertStr; // +" : (*)";
138   protected final String _dbName = "TestDB0";
139 
140   @BeforeClass()
141   public void beforeClass() throws Exception
142   {
143     _zkClient = new ZkClient(ZK_ADDR);
144     _zkClient.setZkSerializer(new ZNRecordSerializer());
145 
146     _setupTool = new ClusterSetup(ZK_ADDR);
147   }
148 
149   @AfterClass
150   public void afterClass()
151   {
152     _zkClient.close();
153   }
154 
155   public class WildcardAlertTransition extends MockTransition
156   {
157     @Override
158     public void doTransition(Message message, NotificationContext context)
159     {
160       HelixManager manager = context.getManager();
161       HelixDataAccessor accessor = manager.getHelixDataAccessor();
162       String fromState = message.getFromState();
163       String toState = message.getToState();
164       String instance = message.getTgtName();
165       String partition = message.getPartitionName();
166 
167       if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
168       {
169     	//add a stat and report to ZK
170     	//perhaps should keep reporter per instance...
171     	ParticipantHealthReportCollectorImpl reporter =
172     			new ParticipantHealthReportCollectorImpl(manager, instance);
173     	MockEspressoHealthReportProvider provider = new
174     			MockEspressoHealthReportProvider();
175     	reporter.addHealthReportProvider(provider);
176     	String statName = "latency";
177     	//using constant as timestamp so that when each partition does this transition,
178     	//they do not advance timestamp, and no stats double-counted
179     	String timestamp = "12345";
180     	provider.setStat(_dbName, statName,"15", timestamp);
181 
182 
183     	//sleep for random time and see about errors.
184     	/*
185     	Random r = new Random();
186     	int x = r.nextInt(30000);
187     	try {
188 			Thread.sleep(x);
189 		} catch (InterruptedException e) {
190 			// TODO Auto-generated catch block
191 			e.printStackTrace();
192 		}
193 		*/
194 
195      	reporter.transmitHealthReports();
196 
197     	/*
198         for (int i = 0; i < 5; i++)
199         {
200           accessor.setProperty(PropertyType.HEALTHREPORT,
201                                new ZNRecord("mockAlerts" + i),
202                                instance,
203                                "mockAlerts");
204           try
205           {
206             Thread.sleep(1000);
207           }
208           catch (InterruptedException e)
209           {
210             // TODO Auto-generated catch block
211             e.printStackTrace();
212           }
213         }
214         */
215       }
216     }
217 
218   }
219 
220   @Test()
221   public void testWildcardAlert() throws Exception
222   {
223     String clusterName = getShortClassName();
224     MockParticipant[] participants = new MockParticipant[5];
225 
226     System.out.println("START TestWildcardAlert at " + new Date(System.currentTimeMillis()));
227 
228     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start
229                                                          // port
230         "localhost", // participant name prefix
231         "TestDB", // resource name prefix
232         1, // resources
233         10, // partitions per resource
234         5, // number of nodes //change back to 5!!!
235         3, // replicas //change back to 3!!!
236         "MasterSlave", true); // do rebalance
237 
238     // enableHealthCheck(clusterName);
239 
240     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
241     // _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr2);
242 
243     StartCMResult cmResult = TestHelper.startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE);
244     // start participants
245     for (int i = 0; i < 5; i++) // !!!change back to 5
246     {
247       String instanceName = "localhost_" + (12918 + i);
248 
249       participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR,
250           new WildcardAlertTransition());
251       participants[i].syncStart();
252 //      new Thread(participants[i]).start();
253     }
254 
255     TestClusterMBeanObserver jmxMBeanObserver = new TestClusterMBeanObserver(
256         ClusterAlertMBeanCollection.DOMAIN_ALERT);
257 
258     boolean result = ClusterStateVerifier.verifyByPolling(
259         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
260     Assert.assertTrue(result);
261     Thread.sleep(3000);
262     // HealthAggregationTask is supposed to run by a timer every 30s
263     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
264     new HealthStatsAggregationTask(cmResult._manager).run();
265 
266     //sleep for a few seconds to give stats stage time to trigger and for bean to trigger
267     Thread.sleep(3000);
268 
269     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
270     Builder keyBuilder = accessor.keyBuilder();
271 
272     // for (int i = 0; i < 1; i++) //change 1 back to 5
273     // {
274     // String instance = "localhost_" + (12918 + i);
275     // String instance = "localhost_12918";
276     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
277     Map<String, Map<String, String>> recMap = record.getMapFields();
278     Set<String> keySet = recMap.keySet();
279     Map<String, String> alertStatusMap = recMap.get(_alertStatusStr);
280     String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
281     boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
282     Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("75.0"));
283     Assert.assertTrue(fired);
284 
285 
286     // Make sure that the jmxObserver has received all the jmx bean value that is corresponding
287     //to the alerts.
288     jmxMBeanObserver.refresh();
289     Assert.assertTrue(jmxMBeanObserver._beanValueMap.size() >= 1);
290 
291     String beanName = "HelixAlerts:alert=EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)";
292     Assert.assertTrue(jmxMBeanObserver._beanValueMap.containsKey(beanName));
293 
294     Map<String, Object> beanValueMap = jmxMBeanObserver._beanValueMap.get(beanName);
295     Assert.assertEquals(beanValueMap.size(), 4);
296     Assert.assertEquals((beanValueMap.get("AlertFired")), new Integer(1));
297     Assert.assertEquals((beanValueMap.get("AlertValue")), new Double(75.0));
298     Assert
299     .assertEquals(
300     		(String) (beanValueMap.get("SensorName")),
301     		"EXP(decay(1)(localhost_%.RestQueryStats@DBName#TestDB0.latency)|EXPAND|SUMEACH)CMP(GREATER)CON(10)--(%)");
302     // }
303 
304     System.out.println("END TestWildcardAlert at " + new Date(System.currentTimeMillis()));
305   }
306 }