View Javadoc

1   package org.apache.helix.monitoring.mbeans;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.management.ManagementFactory;
23  import java.util.Map;
24  import java.util.TreeMap;
25  import java.util.concurrent.ConcurrentHashMap;
26  
27  import javax.management.MBeanServer;
28  import javax.management.MalformedObjectNameException;
29  import javax.management.ObjectName;
30  
31  import org.apache.helix.model.ExternalView;
32  import org.apache.helix.model.IdealState;
33  import org.apache.log4j.Logger;
34  
35  
36  public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
37  {
38    private static final Logger                                  LOG                       =
39                                                                                               Logger.getLogger(ClusterStatusMonitor.class);
40  
41    static final String                                          CLUSTER_STATUS_KEY        =
42                                                                                               "ClusterStatus";
43    static final String                                          MESSAGE_QUEUE_STATUS_KEY  =
44                                                                                               "MessageQueueStatus";
45    static final String                                          RESOURCE_STATUS_KEY       =
46                                                                                               "ResourceStatus";
47    static final String                                          CLUSTER_DN_KEY            =
48                                                                                               "cluster";
49    static final String                                          RESOURCE_DN_KEY           =
50                                                                                               "resourceName";
51    static final String                                          INSTANCE_DN_KEY           =
52                                                                                               "instanceName";
53  
54    private final String                                         _clusterName;
55    private final MBeanServer                                    _beanServer;
56  
57    private int                                                  _numOfLiveInstances       =
58                                                                                               0;
59    private int                                                  _numOfInstances           =
60                                                                                               0;
61    private int                                                  _numOfDisabledInstances   =
62                                                                                               0;
63    private int                                                  _numOfDisabledPartitions  =
64                                                                                               0;
65  
66    private final ConcurrentHashMap<String, ResourceMonitor>     _resourceMbeanMap         =
67                                                                                               new ConcurrentHashMap<String, ResourceMonitor>();
68  
69    private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap =
70                                                                                               new ConcurrentHashMap<String, MessageQueueMonitor>();
71  
72    public ClusterStatusMonitor(String clusterName)
73    {
74      _clusterName = clusterName;
75      _beanServer = ManagementFactory.getPlatformMBeanServer();
76      try
77      {
78        register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
79      }
80      catch (Exception e)
81      {
82        LOG.error("Register self failed.", e);
83      }
84    }
85  
86    public ObjectName getObjectName(String name) throws MalformedObjectNameException
87    {
88      return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
89    }
90  
91    // Used by other external JMX consumers like ingraph
92    public String getBeanName()
93    {
94      return CLUSTER_STATUS_KEY + " " + _clusterName;
95    }
96  
97    @Override
98    public long getDownInstanceGauge()
99    {
100     return _numOfInstances - _numOfLiveInstances;
101   }
102 
103   @Override
104   public long getInstancesGauge()
105   {
106     return _numOfInstances;
107   }
108 
109   @Override
110   public long getDisabledInstancesGauge()
111   {
112     return _numOfDisabledInstances;
113   }
114 
115   @Override
116   public long getDisabledPartitionsGauge()
117   {
118     return _numOfDisabledPartitions;
119   }
120 
121   @Override
122   public long getMaxMessageQueueSizeGauge()
123   {
124     long maxQueueSize = 0;
125     for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values())
126     {
127       if (msgQueue.getMaxMessageQueueSize() > maxQueueSize)
128       {
129         maxQueueSize = (long)msgQueue.getMaxMessageQueueSize();
130       }
131     }
132     
133     return maxQueueSize;
134   }
135 
136   @Override
137   public String getMessageQueueSizes()
138   {
139     Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
140     for (String instance : _instanceMsgQueueMbeanMap.keySet())
141     {
142       MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
143         msgQueueSizes.put(instance, new Long( (long)msgQueue.getMaxMessageQueueSize()));
144     }
145     
146     return msgQueueSizes.toString();
147   }
148 
149   private void register(Object bean, ObjectName name)
150   {
151     try
152     {
153       if (_beanServer.isRegistered(name))
154       {
155         _beanServer.unregisterMBean(name);
156       }
157     }
158     catch (Exception e)
159     {
160       // OK
161     }
162 
163     try
164     {
165       LOG.info("Registering " + name.toString());
166       _beanServer.registerMBean(bean, name);
167     }
168     catch (Exception e)
169     {
170       LOG.warn("Could not register MBean" + name, e);
171     }
172   }
173 
174   private void unregister(ObjectName name)
175   {
176     try
177     {
178       if (_beanServer.isRegistered(name))
179       {
180         LOG.info("Unregistering " + name.toString());
181         _beanServer.unregisterMBean(name);
182       }
183     }
184     catch (Exception e)
185     {
186       LOG.warn("Could not unregister MBean" + name, e);
187     }
188   }
189 
190   public void setClusterStatusCounters(int numberLiveInstances,
191                                        int numberOfInstances,
192                                        int disabledInstances,
193                                        int disabledPartitions)
194   {
195     _numOfInstances = numberOfInstances;
196     _numOfLiveInstances = numberLiveInstances;
197     _numOfDisabledInstances = disabledInstances;
198     _numOfDisabledPartitions = disabledPartitions;
199   }
200 
201   public void onExternalViewChange(ExternalView externalView, IdealState idealState)
202   {
203     try
204     {
205       String resourceName = externalView.getId();
206       if (!_resourceMbeanMap.containsKey(resourceName))
207       {
208         synchronized (this)
209         {
210           if (!_resourceMbeanMap.containsKey(resourceName))
211           {
212             ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
213             String beanName =
214                 CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
215                     + resourceName;
216             register(bean, getObjectName(beanName));
217             _resourceMbeanMap.put(resourceName, bean);
218           }
219         }
220       }
221       _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
222     }
223     catch (Exception e)
224     {
225       LOG.warn(e);
226     }
227   }
228 
229   public void addMessageQueueSize(String instanceName, int msgQueueSize)
230   {
231     try
232     {
233       if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
234       {
235         synchronized (this)
236         {
237           if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
238           {
239             MessageQueueMonitor bean =
240                 new MessageQueueMonitor(_clusterName, instanceName);
241             _instanceMsgQueueMbeanMap.put(instanceName, bean);
242           }
243         }
244       }
245       _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
246     }
247     catch (Exception e)
248     {
249       LOG.warn("fail to add message queue size to mbean", e);
250     }
251   }
252 
253   public void reset()
254   {
255     LOG.info("Resetting ClusterStatusMonitor");
256     try
257     {
258       for (String resourceName : _resourceMbeanMap.keySet())
259       {
260         String beanName =
261             CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
262                 + resourceName;
263         unregister(getObjectName(beanName));
264       }
265       _resourceMbeanMap.clear();
266 
267       for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values())
268       {
269         bean.reset();
270       }
271       _instanceMsgQueueMbeanMap.clear();
272 
273       unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
274     }
275     catch (Exception e)
276     {
277       LOG.error("fail to reset ClusterStatusMonitor", e);
278     }
279   }
280 
281   @Override
282   public String getSensorName()
283   {
284     return CLUSTER_STATUS_KEY + "_" + _clusterName;
285   }
286 
287 }