View Javadoc

1   package org.apache.helix.healthcheck;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Random;
26  import java.util.Timer;
27  
28  import org.apache.helix.ConfigAccessor;
29  import org.apache.helix.model.ConfigScope;
30  import org.apache.helix.model.builder.ConfigScopeBuilder;
31  import org.apache.helix.HelixDataAccessor;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.HelixTimerTask;
34  import org.apache.helix.controller.pipeline.Pipeline;
35  import org.apache.helix.controller.pipeline.Stage;
36  import org.apache.helix.controller.stages.ClusterEvent;
37  import org.apache.helix.controller.stages.ReadHealthDataStage;
38  import org.apache.helix.controller.stages.StatsAggregationStage;
39  import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
40  import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
41  import org.apache.log4j.Logger;
42  
43  
44  public class HealthStatsAggregationTask extends HelixTimerTask
45  {
46    private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
47  
48    public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
49  
50    private Timer _timer;
51    private final HelixManager _manager;
52    private final Pipeline _healthStatsAggregationPipeline;
53    private final int _delay;
54    private final int _period;
55    private final ClusterAlertMBeanCollection _alertItemCollection;
56    private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
57        new HashMap<String, HelixStageLatencyMonitor>();
58  
59    public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
60    {
61      _manager = manager;
62      _delay = delay;
63      _period = period;
64  
65      // health stats pipeline
66      _healthStatsAggregationPipeline = new Pipeline();
67      _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
68      StatsAggregationStage statAggregationStage = new StatsAggregationStage();
69      _healthStatsAggregationPipeline.addStage(statAggregationStage);
70      _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
71  
72      registerStageLatencyMonitor(_healthStatsAggregationPipeline);
73    }
74  
75    public HealthStatsAggregationTask(HelixManager manager)
76    {
77      this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
78    }
79  
80    private void registerStageLatencyMonitor(Pipeline pipeline)
81    {
82      for (Stage stage : pipeline.getStages())
83      {
84        String stgName = stage.getStageName();
85        if (!_stageLatencyMonitorMap.containsKey(stgName))
86        {
87          try
88          {
89            _stageLatencyMonitorMap.put(stage.getStageName(),
90                                        new HelixStageLatencyMonitor(_manager.getClusterName(),
91                                                                     stgName));
92          }
93          catch (Exception e)
94          {
95            LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
96          }
97        }
98        else
99        {
100         LOG.error("StageLatencyMonitor for stage: " + stgName
101             + " already exists. Skip register it");
102       }
103     }
104   }
105 
106   @Override
107   public void start()
108   {
109     LOG.info("START HealthAggregationTask");
110 
111     if (_timer == null)
112     {
113       // Remove all the previous health check values, if any
114       HelixDataAccessor accessor = _manager.getHelixDataAccessor();
115       List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
116       for(String healthReportName : existingHealthRecordNames)
117       {
118         LOG.info("Removing old healthrecord " + healthReportName);
119         accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
120       }
121       
122       _timer = new Timer(true);
123       _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
124     }
125     else
126     {
127       LOG.warn("timer already started");
128     }
129   }
130 
131   @Override
132   public synchronized void stop()
133   {
134     LOG.info("Stop HealthAggregationTask");
135 
136     if (_timer != null)
137     {
138       _timer.cancel();
139       _timer = null;
140       _alertItemCollection.reset();
141 
142       for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
143       {
144         stgLatencyMonitor.reset();
145       }
146     }
147     else
148     {
149       LOG.warn("timer already stopped");
150     }
151   }
152 
153   @Override
154   public synchronized void run()
155   {
156     if (!isEnabled())
157     {
158       LOG.info("HealthAggregationTask is disabled.");
159       return;
160     }
161     
162     if (!_manager.isLeader())
163     {
164       LOG.error("Cluster manager: " + _manager.getInstanceName()
165           + " is not leader. Pipeline will not be invoked");
166       return;
167     }
168 
169     try
170     {
171       ClusterEvent event = new ClusterEvent("healthChange");
172       event.addAttribute("helixmanager", _manager);
173       event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
174 
175       _healthStatsAggregationPipeline.handle(event);
176       _healthStatsAggregationPipeline.finish();
177     }
178     catch (Exception e)
179     {
180       LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
181                 e);
182     }
183   }
184 
185   private boolean isEnabled()
186   {
187     ConfigAccessor configAccessor = _manager.getConfigAccessor();
188     boolean enabled = true;
189     if (configAccessor != null)
190     {
191       // zk-based cluster manager
192       ConfigScope scope =
193           new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
194       String isEnabled = configAccessor.get(scope, "healthChange.enabled");
195       if (isEnabled != null)
196       {
197         enabled = new Boolean(isEnabled);
198       }
199     }
200     else
201     {
202       LOG.debug("File-based cluster manager doesn't support disable healthChange");
203     }
204     return enabled;
205   }
206 
207 }