1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedReader;
23 import java.io.File;
24 import java.io.FileFilter;
25 import java.io.FileInputStream;
26 import java.io.InputStreamReader;
27 import java.sql.Timestamp;
28 import java.text.SimpleDateFormat;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Date;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.TreeMap;
36
37 import org.apache.helix.ZNRecord;
38 import org.apache.helix.manager.zk.ZNRecordSerializer;
39 import org.apache.helix.model.LiveInstance;
40 import org.apache.helix.model.Message;
41 import org.apache.helix.model.Message.MessageState;
42 import org.apache.log4j.Logger;
43
44
45 public class ZkLogAnalyzer
46 {
47 private static Logger LOG = Logger.getLogger(ZkLogAnalyzer.class);
48 private static boolean dump = false; ;
49 final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
50
51 static class Stats
52 {
53 int msgSentCount = 0;
54 int msgSentCount_O2S = 0;
55 int msgSentCount_S2M = 0;
56 int msgSentCount_M2S = 0;
57 int msgDeleteCount = 0;
58 int msgModifyCount = 0;
59 int curStateCreateCount = 0;
60 int curStateUpdateCount = 0;
61 int extViewCreateCount = 0;
62 int extViewUpdateCount = 0;
63 }
64
65 static String getAttributeValue(String line, String attribute)
66 {
67 if (line == null)
68 return null;
69 String[] parts = line.split("\\s");
70 if (parts != null && parts.length > 0)
71 {
72 for (int i = 0; i < parts.length; i++)
73 {
74 if (parts[i].startsWith(attribute))
75 {
76 String val = parts[i].substring(attribute.length());
77 return val;
78 }
79 }
80 }
81 return null;
82 }
83
84 static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end)
85 {
86 long lastCSUpdateTimestamp = Long.MIN_VALUE;
87 String lastCSUpdateLine = null;
88 for (String line : csUpdateLines)
89 {
90
91 long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
92 if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp)
93 {
94 lastCSUpdateTimestamp = timestamp;
95 lastCSUpdateLine = line;
96 }
97 }
98 assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
99 return lastCSUpdateLine;
100 }
101
102 static ZNRecord getZNRecord(String line)
103 {
104 ZNRecord record = null;
105 String value = getAttributeValue(line, "data:");
106 if (value != null)
107 {
108 record = (ZNRecord) _deserializer.deserialize(value.getBytes());
109
110
111
112
113 }
114 return record;
115 }
116
117 public static void main(String[] args) throws Exception
118 {
119 if (args.length != 3)
120 {
121 System.err.println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
122 System.exit(1);
123 }
124
125 System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
126
127
128
129
130 String zkLogDir = args[0];
131 String clusterName = args[1];
132
133 String startTimeStr = args[2];
134
135
136 SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
137 Date date = formatter.parse(startTimeStr);
138 long startTimeStamp = date.getTime();
139
140 System.out.println(clusterName + " created at " + date);
141 while (zkLogDir.endsWith("/"))
142 {
143 zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
144 }
145 if (!zkLogDir.endsWith("/version-2"))
146 {
147 zkLogDir = zkLogDir + "/version-2";
148 }
149 File dir = new File(zkLogDir);
150 File[] zkLogs = dir.listFiles(new FileFilter()
151 {
152
153 @Override
154 public boolean accept(File file)
155 {
156 return file.isFile() && (file.getName().indexOf("log") != -1);
157 }
158 });
159
160
161 TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
162 for (File file : zkLogs)
163 {
164 if (file.lastModified() > startTimeStamp)
165 {
166 lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
167 }
168 }
169
170 List<String> parsedZkLogs = new ArrayList<String>();
171 int i = 0;
172 System.out.println("zk logs last modified later than "
173 + new Timestamp(startTimeStamp));
174 for (Long lastModified : lastZkLogs.keySet())
175 {
176 String fileName = lastZkLogs.get(lastModified);
177 System.out.println(new Timestamp(lastModified) + ": "
178 + (fileName.substring(fileName.lastIndexOf('/') + 1)));
179
180 String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
181 i++;
182 ZKLogFormatter.main(new String[] { "log", fileName, parsedFileName });
183 parsedZkLogs.add(parsedFileName);
184 }
185
186
187 Map<String, String> sessionMap = new HashMap<String, String>();
188
189
190
191
192
193 List<String> csUpdateLines = new ArrayList<String>();
194
195 String leaderSession = null;
196
197 System.out.println();
198 Stats stats = new Stats();
199 long lastTestStartTimestamp = Long.MAX_VALUE;
200 long controllerStartTime = 0;
201 for (String parsedZkLog : parsedZkLogs)
202 {
203
204 FileInputStream fis = new FileInputStream(parsedZkLog);
205 BufferedReader br = new BufferedReader(new InputStreamReader(fis));
206
207 String inputLine;
208 while ((inputLine = br.readLine()) != null)
209 {
210 String timestamp = getAttributeValue(inputLine, "time:");
211 if (timestamp == null)
212 {
213 continue;
214 }
215 long timestampVal = Long.parseLong(timestamp);
216 if (timestampVal < startTimeStamp)
217 {
218 continue;
219 }
220
221 if (dump == true)
222 {
223 String printLine = inputLine.replaceAll("data:.*", "");
224 printLine = new Timestamp(timestampVal) + " " + printLine.substring(printLine.indexOf("session:"));
225 System.err.println(printLine);
226 }
227
228 if (inputLine.indexOf("/start_disable") != -1)
229 {
230 dump = true;
231 }
232 if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1)
233 {
234 String type = getAttributeValue(inputLine, "type:");
235 if (type.equals("delete"))
236 {
237 System.out.println(timestamp + ": verify done");
238 System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
239 String lastCSUpdateLine =
240 findLastCSUpdateBetween(csUpdateLines,
241 lastTestStartTimestamp,
242 timestampVal);
243 long lastCSUpdateTimestamp =
244 Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
245 System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
246
247 System.out.println("state transition latency: "
248 + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
249
250 System.out.println("state transition latency since controller start: "
251 + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
252
253 System.out.println("Create MSG\t" + stats.msgSentCount + "\t"
254 + stats.msgSentCount_O2S + "\t" + stats.msgSentCount_S2M + "\t"
255 + stats.msgSentCount_M2S);
256 System.out.println("Modify MSG\t" + stats.msgModifyCount);
257 System.out.println("Delete MSG\t" + stats.msgDeleteCount);
258 System.out.println("Create CS\t" + stats.curStateCreateCount);
259 System.out.println("Update CS\t" + stats.curStateUpdateCount);
260 System.out.println("Create EV\t" + stats.extViewCreateCount);
261 System.out.println("Update EV\t" + stats.extViewUpdateCount);
262
263 System.out.println();
264 stats = new Stats();
265 lastTestStartTimestamp = Long.MAX_VALUE;
266 }
267 }
268 else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1)
269 {
270
271 if (timestampVal < lastTestStartTimestamp)
272 {
273 System.out.println("START cluster. SETTING lastTestStartTimestamp to "
274 + new Timestamp(timestampVal) + "\nline:" + inputLine);
275 lastTestStartTimestamp = timestampVal;
276 }
277
278 ZNRecord record = getZNRecord(inputLine);
279 LiveInstance liveInstance = new LiveInstance(record);
280 String session = getAttributeValue(inputLine, "session:");
281 sessionMap.put(session, inputLine);
282 System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
283 + liveInstance.getInstanceName());
284 }
285 else if (inputLine.indexOf("closeSession") != -1)
286 {
287
288 String session = getAttributeValue(inputLine, "session:");
289 if (sessionMap.containsKey(session))
290 {
291 if (timestampVal < lastTestStartTimestamp)
292 {
293 System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
294 + " line:" + inputLine);
295 lastTestStartTimestamp = timestampVal;
296 }
297 String line = sessionMap.get(session);
298 ZNRecord record = getZNRecord(line);
299 LiveInstance liveInstance = new LiveInstance(record);
300
301 System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
302 + liveInstance.getInstanceName());
303 dump = true;
304 }
305 }
306 else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1)
307 {
308
309 String type = getAttributeValue(inputLine, "type:");
310 if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1)
311 {
312 if (timestampVal < lastTestStartTimestamp)
313 {
314 System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " + timestampVal
315 + " line:" + inputLine);
316 lastTestStartTimestamp = timestampVal;
317 }
318 }
319 } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1)
320 {
321
322 ZNRecord record = getZNRecord(inputLine);
323 LiveInstance liveInstance = new LiveInstance(record);
324 String session = getAttributeValue(inputLine, "session:");
325 leaderSession = session;
326 controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
327 sessionMap.put(session, inputLine);
328 System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
329 + liveInstance.getInstanceName());
330 }
331 else if (inputLine.indexOf("/" + clusterName + "/") != -1
332 && inputLine.indexOf("/CURRENTSTATES/") != -1)
333 {
334 String type = getAttributeValue(inputLine, "type:");
335 if (type.equals("create"))
336 {
337 stats.curStateCreateCount++;
338 }
339 else if (type.equals("setData"))
340 {
341 String path = getAttributeValue(inputLine, "path:");
342 csUpdateLines.add(inputLine);
343 stats.curStateUpdateCount++;
344
345 System.out.println("Update currentstate:"
346 + new Timestamp(Long.parseLong(timestamp)) + ":" + timestamp + " path:"
347 + path);
348 }
349 }
350 else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1)
351 {
352 String session = getAttributeValue(inputLine, "session:");
353 if (session.equals(leaderSession))
354 {
355 String type = getAttributeValue(inputLine, "type:");
356 if (type.equals("create"))
357 {
358 stats.extViewCreateCount++;
359 }
360 else if (type.equals("setData"))
361 {
362 stats.extViewUpdateCount++;
363 }
364 }
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384 }
385 else if (inputLine.indexOf("/" + clusterName + "/") != -1
386 && inputLine.indexOf("/MESSAGES/") != -1)
387 {
388 String type = getAttributeValue(inputLine, "type:");
389
390 if (type.equals("create"))
391 {
392 ZNRecord record = getZNRecord(inputLine);
393 Message msg = new Message(record);
394 String sendSession = getAttributeValue(inputLine, "session:");
395 if (sendSession.equals(leaderSession)
396 && msg.getMsgType().equals("STATE_TRANSITION")
397 && msg.getMsgState() == MessageState.NEW)
398 {
399
400 stats.msgSentCount++;
401
402 if (msg.getFromState().equals("OFFLINE")
403 && msg.getToState().equals("SLAVE"))
404 {
405 stats.msgSentCount_O2S++;
406 }
407 else if (msg.getFromState().equals("SLAVE")
408 && msg.getToState().equals("MASTER"))
409 {
410 stats.msgSentCount_S2M++;
411 }
412 else if (msg.getFromState().equals("MASTER")
413 && msg.getToState().equals("SLAVE"))
414 {
415 stats.msgSentCount_M2S++;
416 }
417
418
419 }
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447 }
448 else if (type.equals("setData"))
449 {
450 stats.msgModifyCount++;
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484 }
485 else if (type.equals("delete"))
486 {
487 stats.msgDeleteCount++;
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505 }
506 }
507 }
508 }
509 }
510 }