1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.BufferedWriter;
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.OutputStreamWriter;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.manager.zk.ZNRecordSerializer;
37 import org.apache.helix.model.IdealState.IdealStateProperty;
38 import org.apache.helix.util.HelixUtil;
39
40
41 public class ZkLogCSVFormatter
42 {
43 private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
44 private static String _fieldDelim = ",";
45
46
47
48
49 public static void main(String[] args) throws Exception
50 {
51 if (args.length != 2)
52 {
53 System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
54 System.exit(2);
55 }
56 File outputDir = new File(args[1]);
57 if (!outputDir.exists() || !outputDir.isDirectory())
58 {
59 System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
60 System.exit(2);
61 }
62 format(args[0], args[1]);
63 }
64
65 private static void formatter(BufferedWriter bw, String... args)
66 {
67 StringBuffer sb = new StringBuffer();
68
69 if (args.length == 0)
70 {
71 return;
72 }
73 else
74 {
75 sb.append(args[0]);
76 for (int i = 1; i < args.length; i++)
77 {
78 sb.append(_fieldDelim).append(args[i]);
79 }
80 }
81
82 try
83 {
84 bw.write(sb.toString());
85 bw.newLine();
86
87 }
88 catch (IOException e)
89 {
90
91 e.printStackTrace();
92 }
93 }
94
95 private static String getAttributeValue(String line, String attribute)
96 {
97 String[] parts = line.split("\\s");
98 if (parts != null && parts.length > 0)
99 {
100 for (int i = 0; i < parts.length; i++)
101 {
102 if (parts[i].startsWith(attribute))
103 {
104 String val = parts[i].substring(attribute.length());
105 return val;
106 }
107 }
108 }
109 return null;
110 }
111
112 private static void format(String logfilepath, String outputDir) throws FileNotFoundException
113 {
114 try
115 {
116
117 FileInputStream fis = new FileInputStream(logfilepath);
118 BufferedReader br = new BufferedReader(new InputStreamReader(fis));
119
120
121 FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
122 BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
123
124 FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
125 BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
126
127 FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
128 BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
129
130 FileOutputStream smdCntFos =
131 new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
132 BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
133
134 FileOutputStream smdNextFos =
135 new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
136 BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
137
138 FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
139 BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
140
141 FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
142 BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
143
144 FileOutputStream hrPerfFos = new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
145 BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
146
147 FileOutputStream liFos =
148 new FileOutputStream(outputDir + "/" + "liveInstances.csv");
149 BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
150
151 formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
152 formatter(isBw,
153 "timestamp",
154 "resourceName",
155 "partitionNumber",
156 "mode",
157 "partition",
158 "instanceName",
159 "priority");
160 formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
161 formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
162 formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
163 formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
164 formatter(csBw,
165 "timestamp",
166 "resourceName",
167 "partition",
168 "instanceName",
169 "sessionId",
170 "state");
171 formatter(msgBw,
172 "timestamp",
173 "resourceName",
174 "partition",
175 "instanceName",
176 "sessionId",
177 "from",
178 "to",
179 "messageType",
180 "messageState");
181 formatter(hrPerfBw,
182 "timestamp",
183 "instanceName",
184 "availableCPUs",
185 "averageSystemLoad",
186 "freeJvmMemory",
187 "freePhysicalMemory",
188 "totalJvmMemory");
189
190 Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
191
192 int pos;
193 String inputLine;
194 while ((inputLine = br.readLine()) != null)
195 {
196 if (inputLine.indexOf("CONFIGS") != -1)
197 {
198 pos = inputLine.indexOf("CONFIGS");
199 pos = inputLine.indexOf("data:{", pos);
200 if (pos != -1)
201 {
202 String timestamp = getAttributeValue(inputLine, "time:");
203 ZNRecord record =
204 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
205 .getBytes());
206
207 formatter(cfgBw,
208 timestamp,
209 record.getId(),
210 record.getSimpleField("HOST"),
211 record.getSimpleField("PORT"),
212 record.getSimpleField("ENABLED"));
213
214 }
215 }
216 else if (inputLine.indexOf("IDEALSTATES") != -1)
217 {
218 pos = inputLine.indexOf("IDEALSTATES");
219 pos = inputLine.indexOf("data:{", pos);
220 if (pos != -1)
221 {
222 String timestamp = getAttributeValue(inputLine, "time:");
223 ZNRecord record =
224 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
225 .getBytes());
226
227 for (String partition : record.getListFields().keySet())
228 {
229 List<String> preferenceList = record.getListFields().get(partition);
230 for (int i = 0; i < preferenceList.size(); i++)
231 {
232 String instance = preferenceList.get(i);
233 formatter(isBw,
234 timestamp,
235 record.getId(),
236 record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
237 record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString()),
238 partition,
239 instance,
240 Integer.toString(i));
241 }
242 }
243 }
244 }
245 else if (inputLine.indexOf("LIVEINSTANCES") != -1)
246 {
247 pos = inputLine.indexOf("LIVEINSTANCES");
248 pos = inputLine.indexOf("data:{", pos);
249 if (pos != -1)
250 {
251 String timestamp = getAttributeValue(inputLine, "time:");
252 ZNRecord record =
253 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
254 .getBytes());
255 formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
256 String zkSessionId = getAttributeValue(inputLine, "session:");
257 if (zkSessionId == null)
258 {
259 System.err.println("no zk session id associated with the adding of live instance: "
260 + inputLine);
261 }
262 else
263 {
264 liveInstanceSessionMap.put(zkSessionId, record);
265 }
266 }
267
268 }
269 else if (inputLine.indexOf("EXTERNALVIEW") != -1)
270 {
271 pos = inputLine.indexOf("EXTERNALVIEW");
272 pos = inputLine.indexOf("data:{", pos);
273 if (pos != -1)
274 {
275 String timestamp = getAttributeValue(inputLine, "time:");
276 ZNRecord record =
277 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
278 .getBytes());
279
280 for (String partition : record.getMapFields().keySet())
281 {
282 Map<String, String> stateMap = record.getMapFields().get(partition);
283 for (String instance : stateMap.keySet())
284 {
285 String state = stateMap.get(instance);
286 formatter(evBw, timestamp, record.getId(), partition, instance, state);
287 }
288 }
289 }
290 }
291 else if (inputLine.indexOf("STATEMODELDEFS") != -1)
292 {
293 pos = inputLine.indexOf("STATEMODELDEFS");
294 pos = inputLine.indexOf("data:{", pos);
295 if (pos != -1)
296 {
297 String timestamp = getAttributeValue(inputLine, "time:");
298 ZNRecord record =
299 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
300 .getBytes());
301
302 for (String stateInfo : record.getMapFields().keySet())
303 {
304 if (stateInfo.endsWith(".meta"))
305 {
306 Map<String, String> metaMap = record.getMapFields().get(stateInfo);
307 formatter(smdCntBw,
308 timestamp,
309 record.getId(),
310 stateInfo.substring(0, stateInfo.indexOf('.')),
311 metaMap.get("count"));
312 }
313 else if (stateInfo.endsWith(".next"))
314 {
315 Map<String, String> nextMap = record.getMapFields().get(stateInfo);
316 for (String destState : nextMap.keySet())
317 {
318 formatter(smdNextBw,
319 timestamp,
320 record.getId(),
321 stateInfo.substring(0, stateInfo.indexOf('.')),
322 destState,
323 nextMap.get(destState));
324 }
325 }
326 }
327 }
328 }
329 else if (inputLine.indexOf("CURRENTSTATES") != -1)
330 {
331 pos = inputLine.indexOf("CURRENTSTATES");
332 pos = inputLine.indexOf("data:{", pos);
333 if (pos != -1)
334 {
335 String timestamp = getAttributeValue(inputLine, "time:");
336 ZNRecord record =
337 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
338 .getBytes());
339
340 for (String partition : record.getMapFields().keySet())
341 {
342 Map<String, String> stateMap = record.getMapFields().get(partition);
343 String path = getAttributeValue(inputLine, "path:");
344 if (path != null)
345 {
346 String instance = HelixUtil.getInstanceNameFromPath(path);
347 formatter(csBw,
348 timestamp,
349 record.getId(),
350 partition,
351 instance,
352 record.getSimpleField("SESSION_ID"),
353 stateMap.get("CURRENT_STATE"));
354 }
355 }
356 }
357 }
358 else if (inputLine.indexOf("MESSAGES") != -1)
359 {
360 pos = inputLine.indexOf("MESSAGES");
361 pos = inputLine.indexOf("data:{", pos);
362 if (pos != -1)
363 {
364 String timestamp = getAttributeValue(inputLine, "time:");
365 ZNRecord record =
366 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
367 .getBytes());
368
369 formatter(msgBw,
370 timestamp,
371 record.getSimpleField("RESOURCE_NAME"),
372 record.getSimpleField("PARTITION_NAME"),
373 record.getSimpleField("TGT_NAME"),
374 record.getSimpleField("TGT_SESSION_ID"),
375 record.getSimpleField("FROM_STATE"),
376 record.getSimpleField("TO_STATE"),
377 record.getSimpleField("MSG_TYPE"),
378 record.getSimpleField("MSG_STATE"));
379 }
380
381 }
382 else if (inputLine.indexOf("closeSession") != -1)
383 {
384 String zkSessionId = getAttributeValue(inputLine, "session:");
385 if (zkSessionId == null)
386 {
387 System.err.println("no zk session id associated with the closing of zk session: "
388 + inputLine);
389 }
390 else
391 {
392 ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
393
394 if (record != null)
395 {
396 String timestamp = getAttributeValue(inputLine, "time:");
397 formatter(liBw,
398 timestamp,
399 record.getId(),
400 record.getSimpleField("SESSION_ID"),
401 "DELETE");
402 }
403 }
404 }
405 else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1)
406 {
407 pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
408 pos = inputLine.indexOf("data:{", pos);
409 if (pos != -1)
410 {
411 String timestamp = getAttributeValue(inputLine, "time:");
412 ZNRecord record =
413 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
414 .getBytes());
415
416 String path = getAttributeValue(inputLine, "path:");
417 if (path != null)
418 {
419 String instance = HelixUtil.getInstanceNameFromPath(path);
420 formatter(hrPerfBw,
421 timestamp,
422 instance,
423 record.getSimpleField("availableCPUs"),
424 record.getSimpleField("averageSystemLoad"),
425 record.getSimpleField("freeJvmMemory"),
426 record.getSimpleField("freePhysicalMemory"),
427 record.getSimpleField("totalJvmMemory"));
428 }
429 }
430 }
431 }
432
433 br.close();
434 isBw.close();
435 cfgBw.close();
436 evBw.close();
437 smdCntBw.close();
438 smdNextBw.close();
439 csBw.close();
440 msgBw.close();
441 liBw.close();
442 hrPerfBw.close();
443 }
444 catch (Exception e)
445 {
446 System.err.println("Error: " + e.getMessage());
447 }
448 }
449 }