1 package org.apache.helix.healthcheck;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.LinkedList;
23 import java.util.Map;
24 import java.util.Random;
25 import java.util.Timer;
26 import java.util.TimerTask;
27
28 import org.apache.helix.HelixDataAccessor;
29 import org.apache.helix.HelixManager;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.alerts.StatsHolder;
33 import org.apache.helix.model.HealthStat;
34 import org.apache.log4j.Logger;
35
36
37 public class ParticipantHealthReportCollectorImpl implements
38 ParticipantHealthReportCollector
39 {
40 private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
41 private Timer _timer;
42 private static final Logger _logger = Logger
43 .getLogger(ParticipantHealthReportCollectorImpl.class);
44 private final HelixManager _helixManager;
45 String _instanceName;
46 public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
47
48 public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
49 String instanceName)
50 {
51 _helixManager = helixManager;
52 _instanceName = instanceName;
53 addDefaultHealthCheckInfoProvider();
54 }
55
56 private void addDefaultHealthCheckInfoProvider()
57 {
58 addHealthReportProvider(new DefaultHealthReportProvider());
59 }
60
61 public void start()
62 {
63 if (_timer == null)
64 {
65 _timer = new Timer(true);
66 _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
67 new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
68 }
69 else
70 {
71 _logger.warn("timer already started");
72 }
73 }
74
75 @Override
76 public void addHealthReportProvider(HealthReportProvider provider)
77 {
78 try
79 {
80 synchronized (_healthReportProviderList)
81 {
82 if (!_healthReportProviderList.contains(provider))
83 {
84 _healthReportProviderList.add(provider);
85 }
86 else
87 {
88 _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
89 }
90 }
91 }
92 catch (Exception e)
93 {
94 _logger.error(e);
95 }
96 }
97
98 @Override
99 public void removeHealthReportProvider(HealthReportProvider provider)
100 {
101 synchronized (_healthReportProviderList)
102 {
103 if (_healthReportProviderList.contains(provider))
104 {
105 _healthReportProviderList.remove(provider);
106 }
107 else
108 {
109 _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
110 }
111 }
112 }
113
114 @Override
115 public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
116 {
117 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
118 Builder keyBuilder = accessor.keyBuilder();
119
120
121
122 accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()),
123 new HealthStat(healthCheckInfoUpdate));
124
125 }
126
127 public void stop()
128 {
129 _logger.info("Stop HealthCheckInfoReportingTask");
130 if (_timer != null)
131 {
132 _timer.cancel();
133 _timer = null;
134 }
135 else
136 {
137 _logger.warn("timer already stopped");
138 }
139 }
140
141 @Override
142 public synchronized void transmitHealthReports()
143 {
144 synchronized (_healthReportProviderList)
145 {
146 for (HealthReportProvider provider : _healthReportProviderList)
147 {
148 try
149 {
150 Map<String, String> report = provider.getRecentHealthReport();
151 Map<String, Map<String, String>> partitionReport = provider
152 .getRecentPartitionHealthReport();
153 ZNRecord record = new ZNRecord(provider.getReportName());
154 if (report != null)
155 {
156 record.setSimpleFields(report);
157 }
158 if (partitionReport != null)
159 {
160 record.setMapFields(partitionReport);
161 }
162 record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
163
164 HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
165 Builder keyBuilder = accessor.keyBuilder();
166 accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()),
167 new HealthStat(record));
168
169
170
171
172 provider.resetStats();
173 }
174 catch (Exception e)
175 {
176 _logger.error("", e);
177 }
178 }
179 }
180 }
181
182 class HealthCheckInfoReportingTask extends TimerTask
183 {
184 @Override
185 public void run()
186 {
187 transmitHealthReports();
188 }
189 }
190 }