1 package org.apache.helix.healthcheck;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.Timer;
27
28 import org.apache.helix.ConfigAccessor;
29 import org.apache.helix.model.ConfigScope;
30 import org.apache.helix.model.builder.ConfigScopeBuilder;
31 import org.apache.helix.HelixDataAccessor;
32 import org.apache.helix.HelixManager;
33 import org.apache.helix.HelixTimerTask;
34 import org.apache.helix.controller.pipeline.Pipeline;
35 import org.apache.helix.controller.pipeline.Stage;
36 import org.apache.helix.controller.stages.ClusterEvent;
37 import org.apache.helix.controller.stages.ReadHealthDataStage;
38 import org.apache.helix.controller.stages.StatsAggregationStage;
39 import org.apache.helix.monitoring.mbeans.ClusterAlertMBeanCollection;
40 import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
41 import org.apache.log4j.Logger;
42
43
44 public class HealthStatsAggregationTask extends HelixTimerTask
45 {
46 private static final Logger LOG = Logger.getLogger(HealthStatsAggregationTask.class);
47
48 public final static int DEFAULT_HEALTH_CHECK_LATENCY = 30 * 1000;
49
50 private Timer _timer;
51 private final HelixManager _manager;
52 private final Pipeline _healthStatsAggregationPipeline;
53 private final int _delay;
54 private final int _period;
55 private final ClusterAlertMBeanCollection _alertItemCollection;
56 private final Map<String, HelixStageLatencyMonitor> _stageLatencyMonitorMap =
57 new HashMap<String, HelixStageLatencyMonitor>();
58
59 public HealthStatsAggregationTask(HelixManager manager, int delay, int period)
60 {
61 _manager = manager;
62 _delay = delay;
63 _period = period;
64
65
66 _healthStatsAggregationPipeline = new Pipeline();
67 _healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
68 StatsAggregationStage statAggregationStage = new StatsAggregationStage();
69 _healthStatsAggregationPipeline.addStage(statAggregationStage);
70 _alertItemCollection = statAggregationStage.getClusterAlertMBeanCollection();
71
72 registerStageLatencyMonitor(_healthStatsAggregationPipeline);
73 }
74
75 public HealthStatsAggregationTask(HelixManager manager)
76 {
77 this(manager, DEFAULT_HEALTH_CHECK_LATENCY, DEFAULT_HEALTH_CHECK_LATENCY);
78 }
79
80 private void registerStageLatencyMonitor(Pipeline pipeline)
81 {
82 for (Stage stage : pipeline.getStages())
83 {
84 String stgName = stage.getStageName();
85 if (!_stageLatencyMonitorMap.containsKey(stgName))
86 {
87 try
88 {
89 _stageLatencyMonitorMap.put(stage.getStageName(),
90 new HelixStageLatencyMonitor(_manager.getClusterName(),
91 stgName));
92 }
93 catch (Exception e)
94 {
95 LOG.error("Couldn't create StageLatencyMonitor mbean for stage: " + stgName, e);
96 }
97 }
98 else
99 {
100 LOG.error("StageLatencyMonitor for stage: " + stgName
101 + " already exists. Skip register it");
102 }
103 }
104 }
105
106 @Override
107 public void start()
108 {
109 LOG.info("START HealthAggregationTask");
110
111 if (_timer == null)
112 {
113
114 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
115 List<String> existingHealthRecordNames = accessor.getChildNames(accessor.keyBuilder().healthReports(_manager.getInstanceName()));
116 for(String healthReportName : existingHealthRecordNames)
117 {
118 LOG.info("Removing old healthrecord " + healthReportName);
119 accessor.removeProperty(accessor.keyBuilder().healthReport(_manager.getInstanceName(),healthReportName));
120 }
121
122 _timer = new Timer(true);
123 _timer.scheduleAtFixedRate(this, new Random().nextInt(_delay), _period);
124 }
125 else
126 {
127 LOG.warn("timer already started");
128 }
129 }
130
131 @Override
132 public synchronized void stop()
133 {
134 LOG.info("Stop HealthAggregationTask");
135
136 if (_timer != null)
137 {
138 _timer.cancel();
139 _timer = null;
140 _alertItemCollection.reset();
141
142 for (HelixStageLatencyMonitor stgLatencyMonitor : _stageLatencyMonitorMap.values())
143 {
144 stgLatencyMonitor.reset();
145 }
146 }
147 else
148 {
149 LOG.warn("timer already stopped");
150 }
151 }
152
153 @Override
154 public synchronized void run()
155 {
156 if (!isEnabled())
157 {
158 LOG.info("HealthAggregationTask is disabled.");
159 return;
160 }
161
162 if (!_manager.isLeader())
163 {
164 LOG.error("Cluster manager: " + _manager.getInstanceName()
165 + " is not leader. Pipeline will not be invoked");
166 return;
167 }
168
169 try
170 {
171 ClusterEvent event = new ClusterEvent("healthChange");
172 event.addAttribute("helixmanager", _manager);
173 event.addAttribute("HelixStageLatencyMonitorMap", _stageLatencyMonitorMap);
174
175 _healthStatsAggregationPipeline.handle(event);
176 _healthStatsAggregationPipeline.finish();
177 }
178 catch (Exception e)
179 {
180 LOG.error("Exception while executing pipeline: " + _healthStatsAggregationPipeline,
181 e);
182 }
183 }
184
185 private boolean isEnabled()
186 {
187 ConfigAccessor configAccessor = _manager.getConfigAccessor();
188 boolean enabled = true;
189 if (configAccessor != null)
190 {
191
192 ConfigScope scope =
193 new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
194 String isEnabled = configAccessor.get(scope, "healthChange.enabled");
195 if (isEnabled != null)
196 {
197 enabled = new Boolean(isEnabled);
198 }
199 }
200 else
201 {
202 LOG.debug("File-based cluster manager doesn't support disable healthChange");
203 }
204 return enabled;
205 }
206
207 }