View Javadoc

1   package org.apache.helix.healthcheck;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.Map;
24  import java.util.Set;
25  
26  import org.apache.helix.HelixDataAccessor;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.NotificationContext;
29  import org.apache.helix.TestHelper;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.TestHelper.StartCMResult;
33  import org.apache.helix.alerts.AlertValueAndStatus;
34  import org.apache.helix.controller.HelixControllerMain;
35  import org.apache.helix.healthcheck.HealthStatsAggregationTask;
36  import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
37  import org.apache.helix.integration.ZkIntegrationTestBase;
38  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
39  import org.apache.helix.manager.zk.ZNRecordSerializer;
40  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41  import org.apache.helix.manager.zk.ZkClient;
42  import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
43  import org.apache.helix.mock.participant.MockParticipant;
44  import org.apache.helix.mock.participant.MockTransition;
45  import org.apache.helix.model.Message;
46  import org.apache.helix.tools.ClusterSetup;
47  import org.apache.helix.tools.ClusterStateVerifier;
48  import org.testng.Assert;
49  import org.testng.annotations.AfterClass;
50  import org.testng.annotations.BeforeClass;
51  import org.testng.annotations.Test;
52  
53  
54  public class TestSimpleAlert extends ZkIntegrationTestBase
55  {
56    ZkClient _zkClient;
57    protected ClusterSetup _setupTool = null;
58    protected final String _alertStr = "EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
59    protected final String _alertStatusStr = _alertStr; //+" : (*)";
60    protected final String _dbName = "TestDB0";
61  
62    @BeforeClass ()
63    public void beforeClass() throws Exception
64    {
65      _zkClient = new ZkClient(ZK_ADDR);
66      _zkClient.setZkSerializer(new ZNRecordSerializer());
67  
68      _setupTool = new ClusterSetup(ZK_ADDR);
69    }
70  
71    @AfterClass
72    public void afterClass()
73    {
74      _zkClient.close();
75    }
76  
77    public class SimpleAlertTransition extends MockTransition
78    {
79      int _alertValue;
80      public SimpleAlertTransition(int value)
81      {
82        _alertValue = value;
83      }
84      @Override
85      public void doTransition(Message message, NotificationContext context)
86      {
87        HelixManager manager = context.getManager();
88        HelixDataAccessor accessor = manager.getHelixDataAccessor();
89        String fromState = message.getFromState();
90        String toState = message.getToState();
91        String instance = message.getTgtName();
92        String partition = message.getPartitionName();
93  
94        if (fromState.equalsIgnoreCase("SLAVE")
95            && toState.equalsIgnoreCase("MASTER"))
96        {
97  
98      	//add a stat and report to ZK
99      	//perhaps should keep reporter per instance...
100     	ParticipantHealthReportCollectorImpl reporter =
101     			new ParticipantHealthReportCollectorImpl(manager, instance);
102     	MockEspressoHealthReportProvider provider = new
103     			MockEspressoHealthReportProvider();
104     	reporter.addHealthReportProvider(provider);
105     	String statName = "latency";
106     	provider.setStat(_dbName, statName,""+(0.1+_alertValue));
107     	reporter.transmitHealthReports();
108 
109     	/*
110         for (int i = 0; i < 5; i++)
111         {
112           accessor.setProperty(PropertyType.HEALTHREPORT,
113                                new ZNRecord("mockAlerts" + i),
114                                instance,
115                                "mockAlerts");
116           try
117           {
118             Thread.sleep(1000);
119           }
120           catch (InterruptedException e)
121           {
122             // TODO Auto-generated catch block
123             e.printStackTrace();
124           }
125         }
126         */
127       }
128     }
129 
130   }
131 
132   @Test()
133   public void testSimpleAlert() throws Exception
134   {
135     String clusterName = getShortClassName();
136     MockParticipant[] participants = new MockParticipant[5];
137 
138     System.out.println("START TestSimpleAlert at " + new Date(System.currentTimeMillis()));
139 
140     TestHelper.setupCluster(clusterName,
141                             ZK_ADDR,
142                             12918,        // participant start port
143                             "localhost",  // participant name prefix
144                             "TestDB",     // resource  name prefix
145                             1,            // resources
146                             10,           // partitions per resource
147                             5,            // number of nodes //change back to 5!!!
148                             3,            // replicas //change back to 3!!!
149                             "MasterSlave",
150                             true);        // do rebalance
151 
152     // enableHealthCheck(clusterName);
153 
154 
155     StartCMResult cmResult = TestHelper.startController(clusterName,
156                                "controller_0",
157                                ZK_ADDR,
158                                HelixControllerMain.STANDALONE);
159     cmResult._manager.startTimerTasks();
160     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
161     // start participants
162     for (int i = 0; i < 5; i++) //!!!change back to 5
163     {
164       String instanceName = "localhost_" + (12918 + i);
165 
166       participants[i] = new MockParticipant(clusterName,
167                                             instanceName,
168                                             ZK_ADDR,
169                                             new SimpleAlertTransition(15));
170       participants[i].syncStart();
171 //      new Thread(participants[i]).start();
172     }
173 
174     boolean result = ClusterStateVerifier.verifyByPolling(
175         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
176     Assert.assertTrue(result);
177 
178     // HealthAggregationTask is supposed to run by a timer every 30s
179     // To make sure HealthAggregationTask is run, we invoke it explicitly for this test
180     new HealthStatsAggregationTask(cmResult._manager).run();
181     //sleep for a few seconds to give stats stage time to trigger
182     Thread.sleep(3000);
183 
184     // other verifications go here
185     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
186     Builder keyBuilder = accessor.keyBuilder();
187     //for (int i = 0; i < 1; i++) //change 1 back to 5
188     //{
189       //String instance = "localhost_" + (12918 + i);
190       String instance = "localhost_12918";
191       ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
192       Map<String, Map<String,String>> recMap = record.getMapFields();
193       Set<String> keySet = recMap.keySet();
194       Map<String,String> alertStatusMap = recMap.get(_alertStatusStr);
195       String val = alertStatusMap.get(AlertValueAndStatus.VALUE_NAME);
196       boolean fired = Boolean.parseBoolean(alertStatusMap.get(AlertValueAndStatus.FIRED_NAME));
197       Assert.assertEquals(Double.parseDouble(val), Double.parseDouble("15.1"));
198       Assert.assertTrue(fired);
199       
200       // Verify Alert history from ZK
201       ZNRecord alertHistory = accessor.getProperty(keyBuilder.alertHistory()).getRecord();
202       
203       String deltakey = (String) (alertHistory.getMapFields().keySet().toArray()[0]);
204       Map<String, String> delta = alertHistory.getMapField(deltakey);
205       Assert.assertTrue(delta.size() == 1);
206       Assert.assertTrue(delta.get("EXP(decay(1.0)(localhost_12918.RestQueryStats@DBName#TestDB0.latency))CMP(GREATER)CON(10)--(%)").equals("ON"));
207     //}
208 
209     System.out.println("END TestSimpleAlert at " + new Date(System.currentTimeMillis()));
210   }
211 }