View Javadoc

1   package org.apache.helix.healthcheck;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.Map;
24  import java.util.Set;
25  
26  import org.apache.helix.HelixDataAccessor;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.NotificationContext;
29  import org.apache.helix.TestHelper;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.TestHelper.StartCMResult;
33  import org.apache.helix.controller.HelixControllerMain;
34  import org.apache.helix.healthcheck.HealthStatsAggregationTask;
35  import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
36  import org.apache.helix.integration.ZkIntegrationTestBase;
37  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
38  import org.apache.helix.manager.zk.ZNRecordSerializer;
39  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
40  import org.apache.helix.manager.zk.ZkClient;
41  import org.apache.helix.mock.participant.MockEspressoHealthReportProvider;
42  import org.apache.helix.mock.participant.MockParticipant;
43  import org.apache.helix.mock.participant.MockTransition;
44  import org.apache.helix.model.Message;
45  import org.apache.helix.tools.ClusterSetup;
46  import org.apache.helix.tools.ClusterStateVerifier;
47  import org.testng.Assert;
48  import org.testng.annotations.AfterClass;
49  import org.testng.annotations.BeforeClass;
50  import org.testng.annotations.Test;
51  
52  
53  public class TestAddDropAlert extends ZkIntegrationTestBase
54  {
55    ZkClient _zkClient;
56    protected ClusterSetup _setupTool = null;
57    protected final String _alertStr =
58        "EXP(accumulate()(localhost_12918.RestQueryStats@DBName=TestDB0.latency))CMP(GREATER)CON(10)";
59    protected final String _alertStatusStr = _alertStr; // +" : (*)";
60    protected final String _dbName = "TestDB0";
61  
62    @BeforeClass()
63    public void beforeClass() throws Exception
64    {
65      _zkClient = new ZkClient(ZK_ADDR);
66      _zkClient.setZkSerializer(new ZNRecordSerializer());
67  
68      _setupTool = new ClusterSetup(ZK_ADDR);
69    }
70  
71    @AfterClass
72    public void afterClass()
73    {
74      _zkClient.close();
75    }
76  
77    public class AddDropAlertTransition extends MockTransition
78    {
79      @Override
80      public void doTransition(Message message, NotificationContext context)
81      {
82        HelixManager manager = context.getManager();
83        HelixDataAccessor accessor = manager.getHelixDataAccessor();
84        String fromState = message.getFromState();
85        String toState = message.getToState();
86        String instance = message.getTgtName();
87        String partition = message.getPartitionName();
88  
89        if (fromState.equalsIgnoreCase("SLAVE") && toState.equalsIgnoreCase("MASTER"))
90        {
91  
92          // add a stat and report to ZK
93          // perhaps should keep reporter per instance...
94          ParticipantHealthReportCollectorImpl reporter =
95              new ParticipantHealthReportCollectorImpl(manager, instance);
96          MockEspressoHealthReportProvider provider =
97              new MockEspressoHealthReportProvider();
98          reporter.addHealthReportProvider(provider);
99          String statName = "latency";
100         provider.setStat(_dbName, statName, "15");
101         reporter.transmitHealthReports();
102 
103         // sleep long enough for first set of alerts to report and alert to get deleted
104         // then change reported data
105         try
106         {
107           Thread.sleep(10000);
108         }
109         catch (InterruptedException e)
110         {
111           System.err.println("Error sleeping");
112         }
113         provider.setStat(_dbName, statName, "1");
114         reporter.transmitHealthReports();
115 
116         /*
117          * for (int i = 0; i < 5; i++) { accessor.setProperty(PropertyType.HEALTHREPORT,
118          * new ZNRecord("mockAlerts" + i), instance, "mockAlerts"); try {
119          * Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated
120          * catch block e.printStackTrace(); } }
121          */
122       }
123     }
124   }
125 
126   @Test()
127   public void testAddDropAlert() throws Exception
128   {
129     String clusterName = getShortClassName();
130     MockParticipant[] participants = new MockParticipant[5];
131 
132     System.out.println("START TestAddDropAlert at "
133         + new Date(System.currentTimeMillis()));
134 
135     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
136                             "localhost", // participant name prefix
137                             "TestDB", // resource name prefix
138                             1, // resources
139                             10, // partitions per resource group
140                             5, // number of nodes //change back to 5!!!
141                             1, // replicas //change back to 3!!!
142                             "MasterSlave",
143                             true); // do rebalance
144     // enableHealthCheck(clusterName);
145 
146     _setupTool.getClusterManagementTool().addAlert(clusterName, _alertStr);
147 
148     StartCMResult cmResult =
149         TestHelper.startController(clusterName,
150                                    "controller_0",
151                                    ZK_ADDR,
152                                    HelixControllerMain.STANDALONE);
153     // start participants
154     for (int i = 0; i < 5; i++) // !!!change back to 5
155     {
156       String instanceName = "localhost_" + (12918 + i);
157 
158       participants[i] =
159           new MockParticipant(clusterName,
160                               instanceName,
161                               ZK_ADDR,
162                               new AddDropAlertTransition());
163       participants[i].syncStart();
164 //      new Thread(participants[i]).start();
165     }
166 
167     boolean result =
168         ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
169                                                                                           clusterName));
170     Assert.assertTrue(result);
171 
172     // drop alert soon after adding, but leave enough time for alert to fire once
173     // Thread.sleep(3000);
174     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient));
175     Builder keyBuilder = accessor.keyBuilder();
176 
177     new HealthStatsAggregationTask(cmResult._manager).run();
178     String instance = "localhost_12918";
179     ZNRecord record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
180     Map<String, Map<String, String>> recMap = record.getMapFields();
181     Set<String> keySet = recMap.keySet();
182     Assert.assertTrue(keySet.size() > 0);
183 
184     _setupTool.getClusterManagementTool().dropAlert(clusterName, _alertStr);
185     new HealthStatsAggregationTask(cmResult._manager).run();
186     // other verifications go here
187     // for (int i = 0; i < 1; i++) //change 1 back to 5
188     // {
189     // String instance = "localhost_" + (12918 + i);
190     record = accessor.getProperty(keyBuilder.alertStatus()).getRecord();
191     recMap = record.getMapFields();
192     keySet = recMap.keySet();
193     Assert.assertEquals(keySet.size(), 0);
194     // }
195 
196     System.out.println("END TestAddDropAlert at " + new Date(System.currentTimeMillis()));
197   }
198 }