1 package org.apache.helix.monitoring;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.StringWriter;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.TimerTask;
26
27 import org.apache.helix.HelixDataAccessor;
28 import org.apache.helix.HelixManager;
29 import org.apache.helix.PropertyType;
30 import org.apache.helix.ZNRecord;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.manager.zk.ZNRecordSerializer;
33 import org.apache.helix.manager.zk.ZkClient;
34 import org.apache.helix.util.HelixUtil;
35 import org.apache.log4j.Logger;
36 import org.apache.zookeeper.data.Stat;
37 import org.codehaus.jackson.map.ObjectMapper;
38 import org.codehaus.jackson.map.SerializationConfig;
39
40
41 public class ZKPathDataDumpTask extends TimerTask
42 {
43 static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
44
45 private final int _thresholdNoChangeInMs;
46 private final HelixManager _manager;
47 private final ZkClient _zkClient;
48
49 public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
50 {
51 _manager = manager;
52 _zkClient = zkClient;
53 logger.info("Scannning cluster statusUpdate " + manager.getClusterName()
54 + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
55 _thresholdNoChangeInMs = thresholdNoChangeInMs;
56 }
57
58 @Override
59 public void run()
60 {
61
62
63
64
65
66 logger.info("Scannning status updates ...");
67 try
68 {
69 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
70 Builder keyBuilder = accessor.keyBuilder();
71
72 List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
73 for (String instanceName : instances)
74 {
75 scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
76 PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
77 scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
78 PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
79 }
80 scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
81 PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
82
83 scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
84 PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
85 } catch (Exception e)
86 {
87 logger.error(e);
88 }
89 }
90
91 void scanPath(String path, int thresholdNoChangeInMs)
92 {
93 logger.info("Scannning path " + path);
94 List<String> subPaths = _zkClient.getChildren(path);
95 for (String subPath : subPaths)
96 {
97 try
98 {
99 String nextPath = path + "/" + subPath;
100 List<String> subSubPaths = _zkClient.getChildren(nextPath);
101 for (String subsubPath : subSubPaths)
102 {
103 try
104 {
105 checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
106 } catch (Exception e)
107 {
108 logger.error(e);
109 }
110 }
111 } catch (Exception e)
112 {
113 logger.error(e);
114 }
115 }
116 }
117
118 void checkAndDump(String path, int thresholdNoChangeInMs)
119 {
120 List<String> subPaths = _zkClient.getChildren(path);
121 if(subPaths.size() == 0)
122 {
123 subPaths.add("");
124 }
125 for (String subPath : subPaths)
126 {
127 String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
128 Stat pathStat = _zkClient.getStat(fullPath);
129
130 long lastModifiedTimeInMs = pathStat.getMtime();
131 long nowInMs = new Date().getTime();
132
133
134
135 if (nowInMs > lastModifiedTimeInMs)
136 {
137 long timeDiff = nowInMs - lastModifiedTimeInMs;
138 if (timeDiff > thresholdNoChangeInMs)
139 {
140 logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
141 _zkClient.setZkSerializer(new ZNRecordSerializer());
142 ZNRecord record = _zkClient.readData(fullPath);
143
144
145 ObjectMapper mapper = new ObjectMapper();
146 SerializationConfig serializationConfig = mapper.getSerializationConfig();
147 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
148
149 StringWriter sw = new StringWriter();
150 try
151 {
152 mapper.writeValue(sw, record);
153 logger.info(sw.toString());
154 } catch (Exception e)
155 {
156 logger.warn(
157 "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
158 e);
159 }
160
161 _zkClient.deleteRecursive(fullPath);
162 }
163 }
164 }
165 }
166 }