1 package org.apache.helix.monitoring.mbeans;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.management.ManagementFactory;
23 import java.util.Map;
24 import java.util.TreeMap;
25 import java.util.concurrent.ConcurrentHashMap;
26
27 import javax.management.MBeanServer;
28 import javax.management.MalformedObjectNameException;
29 import javax.management.ObjectName;
30
31 import org.apache.helix.model.ExternalView;
32 import org.apache.helix.model.IdealState;
33 import org.apache.log4j.Logger;
34
35
36 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean
37 {
38 private static final Logger LOG =
39 Logger.getLogger(ClusterStatusMonitor.class);
40
41 static final String CLUSTER_STATUS_KEY =
42 "ClusterStatus";
43 static final String MESSAGE_QUEUE_STATUS_KEY =
44 "MessageQueueStatus";
45 static final String RESOURCE_STATUS_KEY =
46 "ResourceStatus";
47 static final String CLUSTER_DN_KEY =
48 "cluster";
49 static final String RESOURCE_DN_KEY =
50 "resourceName";
51 static final String INSTANCE_DN_KEY =
52 "instanceName";
53
54 private final String _clusterName;
55 private final MBeanServer _beanServer;
56
57 private int _numOfLiveInstances =
58 0;
59 private int _numOfInstances =
60 0;
61 private int _numOfDisabledInstances =
62 0;
63 private int _numOfDisabledPartitions =
64 0;
65
66 private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
67 new ConcurrentHashMap<String, ResourceMonitor>();
68
69 private final ConcurrentHashMap<String, MessageQueueMonitor> _instanceMsgQueueMbeanMap =
70 new ConcurrentHashMap<String, MessageQueueMonitor>();
71
72 public ClusterStatusMonitor(String clusterName)
73 {
74 _clusterName = clusterName;
75 _beanServer = ManagementFactory.getPlatformMBeanServer();
76 try
77 {
78 register(this, getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
79 }
80 catch (Exception e)
81 {
82 LOG.error("Register self failed.", e);
83 }
84 }
85
86 public ObjectName getObjectName(String name) throws MalformedObjectNameException
87 {
88 return new ObjectName(CLUSTER_STATUS_KEY + ": " + name);
89 }
90
91
92 public String getBeanName()
93 {
94 return CLUSTER_STATUS_KEY + " " + _clusterName;
95 }
96
97 @Override
98 public long getDownInstanceGauge()
99 {
100 return _numOfInstances - _numOfLiveInstances;
101 }
102
103 @Override
104 public long getInstancesGauge()
105 {
106 return _numOfInstances;
107 }
108
109 @Override
110 public long getDisabledInstancesGauge()
111 {
112 return _numOfDisabledInstances;
113 }
114
115 @Override
116 public long getDisabledPartitionsGauge()
117 {
118 return _numOfDisabledPartitions;
119 }
120
121 @Override
122 public long getMaxMessageQueueSizeGauge()
123 {
124 long maxQueueSize = 0;
125 for (MessageQueueMonitor msgQueue : _instanceMsgQueueMbeanMap.values())
126 {
127 if (msgQueue.getMaxMessageQueueSize() > maxQueueSize)
128 {
129 maxQueueSize = (long)msgQueue.getMaxMessageQueueSize();
130 }
131 }
132
133 return maxQueueSize;
134 }
135
136 @Override
137 public String getMessageQueueSizes()
138 {
139 Map<String, Long> msgQueueSizes = new TreeMap<String, Long>();
140 for (String instance : _instanceMsgQueueMbeanMap.keySet())
141 {
142 MessageQueueMonitor msgQueue = _instanceMsgQueueMbeanMap.get(instance);
143 msgQueueSizes.put(instance, new Long( (long)msgQueue.getMaxMessageQueueSize()));
144 }
145
146 return msgQueueSizes.toString();
147 }
148
149 private void register(Object bean, ObjectName name)
150 {
151 try
152 {
153 if (_beanServer.isRegistered(name))
154 {
155 _beanServer.unregisterMBean(name);
156 }
157 }
158 catch (Exception e)
159 {
160
161 }
162
163 try
164 {
165 LOG.info("Registering " + name.toString());
166 _beanServer.registerMBean(bean, name);
167 }
168 catch (Exception e)
169 {
170 LOG.warn("Could not register MBean" + name, e);
171 }
172 }
173
174 private void unregister(ObjectName name)
175 {
176 try
177 {
178 if (_beanServer.isRegistered(name))
179 {
180 LOG.info("Unregistering " + name.toString());
181 _beanServer.unregisterMBean(name);
182 }
183 }
184 catch (Exception e)
185 {
186 LOG.warn("Could not unregister MBean" + name, e);
187 }
188 }
189
190 public void setClusterStatusCounters(int numberLiveInstances,
191 int numberOfInstances,
192 int disabledInstances,
193 int disabledPartitions)
194 {
195 _numOfInstances = numberOfInstances;
196 _numOfLiveInstances = numberLiveInstances;
197 _numOfDisabledInstances = disabledInstances;
198 _numOfDisabledPartitions = disabledPartitions;
199 }
200
201 public void onExternalViewChange(ExternalView externalView, IdealState idealState)
202 {
203 try
204 {
205 String resourceName = externalView.getId();
206 if (!_resourceMbeanMap.containsKey(resourceName))
207 {
208 synchronized (this)
209 {
210 if (!_resourceMbeanMap.containsKey(resourceName))
211 {
212 ResourceMonitor bean = new ResourceMonitor(_clusterName, resourceName);
213 String beanName =
214 CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
215 + resourceName;
216 register(bean, getObjectName(beanName));
217 _resourceMbeanMap.put(resourceName, bean);
218 }
219 }
220 }
221 _resourceMbeanMap.get(resourceName).updateExternalView(externalView, idealState);
222 }
223 catch (Exception e)
224 {
225 LOG.warn(e);
226 }
227 }
228
229 public void addMessageQueueSize(String instanceName, int msgQueueSize)
230 {
231 try
232 {
233 if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
234 {
235 synchronized (this)
236 {
237 if (!_instanceMsgQueueMbeanMap.containsKey(instanceName))
238 {
239 MessageQueueMonitor bean =
240 new MessageQueueMonitor(_clusterName, instanceName);
241 _instanceMsgQueueMbeanMap.put(instanceName, bean);
242 }
243 }
244 }
245 _instanceMsgQueueMbeanMap.get(instanceName).addMessageQueueSize(msgQueueSize);
246 }
247 catch (Exception e)
248 {
249 LOG.warn("fail to add message queue size to mbean", e);
250 }
251 }
252
253 public void reset()
254 {
255 LOG.info("Resetting ClusterStatusMonitor");
256 try
257 {
258 for (String resourceName : _resourceMbeanMap.keySet())
259 {
260 String beanName =
261 CLUSTER_DN_KEY + "=" + _clusterName + "," + RESOURCE_DN_KEY + "="
262 + resourceName;
263 unregister(getObjectName(beanName));
264 }
265 _resourceMbeanMap.clear();
266
267 for (MessageQueueMonitor bean : _instanceMsgQueueMbeanMap.values())
268 {
269 bean.reset();
270 }
271 _instanceMsgQueueMbeanMap.clear();
272
273 unregister(getObjectName(CLUSTER_DN_KEY + "=" + _clusterName));
274 }
275 catch (Exception e)
276 {
277 LOG.error("fail to reset ClusterStatusMonitor", e);
278 }
279 }
280
281 @Override
282 public String getSensorName()
283 {
284 return CLUSTER_STATUS_KEY + "_" + _clusterName;
285 }
286
287 }