1 package org.apache.helix.monitoring;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Set;
26
27 import javax.management.InstanceNotFoundException;
28 import javax.management.MBeanAttributeInfo;
29 import javax.management.MBeanInfo;
30 import javax.management.MBeanServerConnection;
31 import javax.management.MBeanServerNotification;
32 import javax.management.MalformedObjectNameException;
33 import javax.management.ObjectInstance;
34 import javax.management.ObjectName;
35
36 import org.apache.helix.monitoring.ParticipantMonitor;
37 import org.apache.helix.monitoring.StateTransitionContext;
38 import org.apache.helix.monitoring.StateTransitionDataPoint;
39 import org.apache.helix.monitoring.mbeans.ClusterMBeanObserver;
40 import org.apache.log4j.Logger;
41 import org.testng.AssertJUnit;
42 import org.testng.annotations.Test;
43
44
45 public class TestParticipantMonitor
46 {
47 static Logger _logger = Logger.getLogger(TestParticipantMonitor.class);
48
49 class ParticipantMonitorListener extends ClusterMBeanObserver
50 {
51 Map<String, Map<String, Object>> _beanValueMap = new HashMap<String, Map<String, Object>>();
52
53 public ParticipantMonitorListener(String domain)
54 throws InstanceNotFoundException, IOException,
55 MalformedObjectNameException, NullPointerException
56 {
57 super(domain);
58 init();
59 }
60
61 void init()
62 {
63 try
64 {
65 Set<ObjectInstance> existingInstances = _server.queryMBeans(new ObjectName(_domain+":Cluster=cluster,*"), null);
66 for(ObjectInstance instance : existingInstances)
67 {
68 String mbeanName = instance.getObjectName().toString();
69
70 addMBean(instance.getObjectName());
71 }
72 }
73 catch (Exception e)
74 {
75 _logger.warn("fail to get all existing mbeans in " + _domain, e);
76 }
77 }
78
79 @Override
80 public void onMBeanRegistered(MBeanServerConnection server,
81 MBeanServerNotification mbsNotification)
82 {
83 addMBean(mbsNotification.getMBeanName());
84 }
85
86 void addMBean(ObjectName beanName)
87 {
88 try
89 {
90 MBeanInfo info = _server.getMBeanInfo(beanName);
91 MBeanAttributeInfo[] infos = info.getAttributes();
92 _beanValueMap.put(beanName.toString(), new HashMap<String, Object>());
93 for(MBeanAttributeInfo infoItem : infos)
94 {
95 Object val = _server.getAttribute(beanName, infoItem.getName());
96
97 _beanValueMap.get(beanName.toString()).put(infoItem.getName(), val);
98 }
99 }
100 catch (Exception e)
101 {
102 _logger.error("Error getting bean info, domain="+_domain, e);
103 }
104 }
105
106 @Override
107 public void onMBeanUnRegistered(MBeanServerConnection server,
108 MBeanServerNotification mbsNotification)
109 {
110
111 }
112 }
113 @Test(groups={ "unitTest" })
114 public void TestReportData() throws InstanceNotFoundException, MalformedObjectNameException, NullPointerException, IOException, InterruptedException
115 {
116 System.out.println("START TestParticipantMonitor");
117 ParticipantMonitor monitor = new ParticipantMonitor();
118
119 int monitorNum = 0;
120
121 StateTransitionContext cxt = new StateTransitionContext("cluster", "instance", "db_1","a-b");
122 StateTransitionDataPoint data = new StateTransitionDataPoint(1000,1000,true);
123 monitor.reportTransitionStat(cxt, data);
124
125 data = new StateTransitionDataPoint(1000,1200,true);
126 monitor.reportTransitionStat(cxt, data);
127
128 ParticipantMonitorListener monitorListener = new ParticipantMonitorListener("CLMParticipantReport");
129 Thread.sleep(1000);
130 AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
131
132 data = new StateTransitionDataPoint(1000,500,true);
133 monitor.reportTransitionStat(cxt, data);
134 Thread.sleep(1000);
135 AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 1);
136
137 data = new StateTransitionDataPoint(1000,500,true);
138 StateTransitionContext cxt2 = new StateTransitionContext("cluster", "instance", "db_2","a-b");
139 monitor.reportTransitionStat(cxt2, data);
140 monitor.reportTransitionStat(cxt2, data);
141 Thread.sleep(1000);
142 AssertJUnit.assertTrue(monitorListener._beanValueMap.size() == monitorNum + 2);
143
144 AssertJUnit.assertFalse(cxt.equals(cxt2));
145 AssertJUnit.assertFalse(cxt.equals(new Object()));
146 AssertJUnit.assertTrue(cxt.equals(new StateTransitionContext("cluster", "instance", "db_1","a-b")));
147
148 cxt2.getInstanceName();
149
150 ParticipantMonitorListener monitorListener2 = new ParticipantMonitorListener("CLMParticipantReport");
151
152 Thread.sleep(1000);
153 AssertJUnit.assertEquals(monitorListener2._beanValueMap.size() , monitorNum + 2);
154
155 monitorListener2.disconnect();
156 monitorListener.disconnect();
157 System.out.println("END TestParticipantMonitor");
158 }
159 }