View Javadoc

1   package org.apache.helix.healthcheck;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.LinkedList;
23  import java.util.Map;
24  import java.util.Random;
25  import java.util.Timer;
26  import java.util.TimerTask;
27  
28  import org.apache.helix.HelixDataAccessor;
29  import org.apache.helix.HelixManager;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.alerts.StatsHolder;
33  import org.apache.helix.model.HealthStat;
34  import org.apache.log4j.Logger;
35  
36  
37  public class ParticipantHealthReportCollectorImpl implements
38      ParticipantHealthReportCollector
39  {
40    private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
41    private Timer _timer;
42    private static final Logger _logger = Logger
43        .getLogger(ParticipantHealthReportCollectorImpl.class);
44    private final HelixManager _helixManager;
45    String _instanceName;
46    public final static int DEFAULT_REPORT_LATENCY = 60 * 1000;
47  
48    public ParticipantHealthReportCollectorImpl(HelixManager helixManager,
49        String instanceName)
50    {
51      _helixManager = helixManager;
52      _instanceName = instanceName;
53      addDefaultHealthCheckInfoProvider();
54    }
55  
56    private void addDefaultHealthCheckInfoProvider()
57    {
58      addHealthReportProvider(new DefaultHealthReportProvider());
59    }
60  
61    public void start()
62    {
63      if (_timer == null)
64      {
65        _timer = new Timer(true);
66        _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
67            new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
68      }
69      else
70      {
71        _logger.warn("timer already started");
72      }
73    }
74  
75    @Override
76    public void addHealthReportProvider(HealthReportProvider provider)
77    {
78      try
79      {
80        synchronized (_healthReportProviderList)
81        {
82          if (!_healthReportProviderList.contains(provider))
83          {
84            _healthReportProviderList.add(provider);
85          }
86          else
87          {
88            _logger.warn("Skipping a duplicated HealthCheckInfoProvider");
89          }
90        }
91      }
92      catch (Exception e)
93      {
94        _logger.error(e);
95      }
96    }
97  
98    @Override
99    public void removeHealthReportProvider(HealthReportProvider provider)
100   {
101     synchronized (_healthReportProviderList)
102     {
103       if (_healthReportProviderList.contains(provider))
104       {
105         _healthReportProviderList.remove(provider);
106       }
107       else
108       {
109         _logger.warn("Skip removing a non-exist HealthCheckInfoProvider");
110       }
111     }
112   }
113 
114   @Override
115   public void reportHealthReportMessage(ZNRecord healthCheckInfoUpdate)
116   {
117     HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
118     Builder keyBuilder = accessor.keyBuilder();
119 //    accessor.setProperty(
120 //        PropertyType.HEALTHREPORT, healthCheckInfoUpdate, _instanceName,
121 //        healthCheckInfoUpdate.getId());
122     accessor.setProperty(keyBuilder.healthReport(_instanceName, healthCheckInfoUpdate.getId()), 
123                          new HealthStat(healthCheckInfoUpdate));
124 
125   }
126 
127   public void stop()
128   {
129     _logger.info("Stop HealthCheckInfoReportingTask");
130     if (_timer != null)
131     {
132       _timer.cancel();
133       _timer = null;
134     }
135     else
136     {
137       _logger.warn("timer already stopped");
138     }
139   }
140 
141   @Override
142   public synchronized void transmitHealthReports()
143   {
144     synchronized (_healthReportProviderList)
145     {
146       for (HealthReportProvider provider : _healthReportProviderList)
147       {
148         try
149         {
150           Map<String, String> report = provider.getRecentHealthReport();
151           Map<String, Map<String, String>> partitionReport = provider
152               .getRecentPartitionHealthReport();
153           ZNRecord record = new ZNRecord(provider.getReportName());
154           if (report != null)
155           {
156             record.setSimpleFields(report);
157           }
158           if (partitionReport != null)
159           {
160             record.setMapFields(partitionReport);
161           }
162           record.setSimpleField(StatsHolder.TIMESTAMP_NAME, "" + System.currentTimeMillis());
163           
164           HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
165           Builder keyBuilder = accessor.keyBuilder();
166           accessor.setProperty(keyBuilder.healthReport(_instanceName, record.getId()), 
167                                new HealthStat(record));
168 
169 //          _helixManager.getDataAccessor().setProperty(
170 //              PropertyType.HEALTHREPORT, record, _instanceName, record.getId());
171           // reset stats (for now just the partition stats)
172           provider.resetStats();
173         }
174         catch (Exception e)
175         {
176           _logger.error("", e);
177         }
178       }
179     }
180   }
181 
182   class HealthCheckInfoReportingTask extends TimerTask
183   {
184     @Override
185     public void run()
186     {
187       transmitHealthReports();
188     }
189   }
190 }