View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.File;
24  import java.io.FileFilter;
25  import java.io.FileInputStream;
26  import java.io.InputStreamReader;
27  import java.sql.Timestamp;
28  import java.text.SimpleDateFormat;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Date;
32  import java.util.HashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.TreeMap;
36  
37  import org.apache.helix.ZNRecord;
38  import org.apache.helix.manager.zk.ZNRecordSerializer;
39  import org.apache.helix.model.LiveInstance;
40  import org.apache.helix.model.Message;
41  import org.apache.helix.model.Message.MessageState;
42  import org.apache.log4j.Logger;
43  
44  
45  public class ZkLogAnalyzer
46  {
47    private static Logger           LOG           = Logger.getLogger(ZkLogAnalyzer.class);
48    private static boolean          dump          = false;                                 ;
49    final static ZNRecordSerializer _deserializer = new ZNRecordSerializer();
50  
51    static class Stats
52    {
53      int msgSentCount        = 0;
54      int msgSentCount_O2S    = 0; // Offline to Slave
55      int msgSentCount_S2M    = 0; // Slave to Master
56      int msgSentCount_M2S    = 0; // Master to Slave
57      int msgDeleteCount      = 0;
58      int msgModifyCount      = 0;
59      int curStateCreateCount = 0;
60      int curStateUpdateCount = 0;
61      int extViewCreateCount  = 0;
62      int extViewUpdateCount  = 0;
63    }
64  
65    static String getAttributeValue(String line, String attribute)
66    {
67      if (line == null)
68        return null;
69      String[] parts = line.split("\\s");
70      if (parts != null && parts.length > 0)
71      {
72        for (int i = 0; i < parts.length; i++)
73        {
74          if (parts[i].startsWith(attribute))
75          {
76            String val = parts[i].substring(attribute.length());
77            return val;
78          }
79        }
80      }
81      return null;
82    }
83  
84    static String findLastCSUpdateBetween(List<String> csUpdateLines, long start, long end)
85    {
86      long lastCSUpdateTimestamp = Long.MIN_VALUE;
87      String lastCSUpdateLine = null;
88      for (String line : csUpdateLines)
89      {
90        // ZNRecord record = getZNRecord(line);
91        long timestamp = Long.parseLong(getAttributeValue(line, "time:"));
92        if (timestamp >= start && timestamp <= end && timestamp > lastCSUpdateTimestamp)
93        {
94          lastCSUpdateTimestamp = timestamp;
95          lastCSUpdateLine = line;
96        }
97      }
98      assert (lastCSUpdateLine != null) : "No CS update between " + start + " - " + end;
99      return lastCSUpdateLine;
100   }
101 
102   static ZNRecord getZNRecord(String line)
103   {
104     ZNRecord record = null;
105     String value = getAttributeValue(line, "data:");
106     if (value != null)
107     {
108       record = (ZNRecord) _deserializer.deserialize(value.getBytes());
109       // if (record == null)
110       // {
111       // System.out.println(line);
112       // }
113     }
114     return record;
115   }
116 
117   public static void main(String[] args) throws Exception
118   {
119     if (args.length != 3)
120     {
121       System.err.println("USAGE: ZkLogAnalyzer zkLogDir clusterName testStartTime (yyMMdd_hhmmss_SSS)");
122       System.exit(1);
123     }
124 
125     System.out.println("ZkLogAnalyzer called with args: " + Arrays.toString(args));
126     // get create-timestamp of "/" + clusterName
127     // find all zk logs after that create-timestamp and parse them
128     // save parsed log in /tmp/zkLogAnalyzor_zklog.parsed0,1,2...
129 
130     String zkLogDir = args[0];
131     String clusterName = args[1];
132     // String zkAddr = args[2];
133     String startTimeStr = args[2];
134     // ZkClient zkClient = new ZkClient(zkAddr);
135     // Stat clusterCreateStat = zkClient.getStat("/" + clusterName);
136     SimpleDateFormat formatter = new SimpleDateFormat("yyMMdd_hhmmss_SSS");
137     Date date = formatter.parse(startTimeStr);
138     long startTimeStamp = date.getTime();
139 
140     System.out.println(clusterName + " created at " + date);
141     while (zkLogDir.endsWith("/"))
142     {
143       zkLogDir = zkLogDir.substring(0, zkLogDir.length() - 1);
144     }
145     if (!zkLogDir.endsWith("/version-2"))
146     {
147       zkLogDir = zkLogDir + "/version-2";
148     }
149     File dir = new File(zkLogDir);
150     File[] zkLogs = dir.listFiles(new FileFilter()
151     {
152 
153       @Override
154       public boolean accept(File file)
155       {
156         return file.isFile() && (file.getName().indexOf("log") != -1);
157       }
158     });
159 
160     // lastModified time -> zkLog
161     TreeMap<Long, String> lastZkLogs = new TreeMap<Long, String>();
162     for (File file : zkLogs)
163     {
164       if (file.lastModified() > startTimeStamp)
165       {
166         lastZkLogs.put(file.lastModified(), file.getAbsolutePath());
167       }
168     }
169 
170     List<String> parsedZkLogs = new ArrayList<String>();
171     int i = 0;
172     System.out.println("zk logs last modified later than "
173         + new Timestamp(startTimeStamp));
174     for (Long lastModified : lastZkLogs.keySet())
175     {
176       String fileName = lastZkLogs.get(lastModified);
177       System.out.println(new Timestamp(lastModified) + ": "
178           + (fileName.substring(fileName.lastIndexOf('/') + 1)));
179 
180       String parsedFileName = "zkLogAnalyzor_zklog.parsed" + i;
181       i++;
182       ZKLogFormatter.main(new String[] { "log", fileName, parsedFileName });
183       parsedZkLogs.add(parsedFileName);
184     }
185 
186     // sessionId -> create liveInstance line
187     Map<String, String> sessionMap = new HashMap<String, String>();
188 
189     // message send lines in time order
190     // List<String> sendMessageLines = new ArrayList<String>();
191 
192     // CS update lines in time order
193     List<String> csUpdateLines = new ArrayList<String>();
194 
195     String leaderSession = null;
196 
197     System.out.println();
198     Stats stats = new Stats();
199     long lastTestStartTimestamp = Long.MAX_VALUE;
200     long controllerStartTime = 0;
201     for (String parsedZkLog : parsedZkLogs)
202     {
203 
204       FileInputStream fis = new FileInputStream(parsedZkLog);
205       BufferedReader br = new BufferedReader(new InputStreamReader(fis));
206 
207       String inputLine;
208       while ((inputLine = br.readLine()) != null)
209       {
210         String timestamp = getAttributeValue(inputLine, "time:");
211         if (timestamp == null)
212         {
213           continue;
214         }
215         long timestampVal = Long.parseLong(timestamp);
216         if (timestampVal < startTimeStamp)
217         {
218           continue;
219         }
220 
221         if (dump == true)
222         {
223           String printLine = inputLine.replaceAll("data:.*", "");
224           printLine = new Timestamp(timestampVal) + " " + printLine.substring(printLine.indexOf("session:"));
225           System.err.println(printLine);
226         }
227 
228         if (inputLine.indexOf("/start_disable") != -1)
229         {
230           dump = true;
231         }
232         if (inputLine.indexOf("/" + clusterName + "/CONFIGS/CLUSTER/verify") != -1)
233         {
234           String type = getAttributeValue(inputLine, "type:");
235           if (type.equals("delete"))
236           {
237             System.out.println(timestamp + ": verify done");
238             System.out.println("lastTestStartTimestamp:" + lastTestStartTimestamp);
239             String lastCSUpdateLine =
240                 findLastCSUpdateBetween(csUpdateLines,
241                                         lastTestStartTimestamp,
242                                         timestampVal);
243             long lastCSUpdateTimestamp =
244                 Long.parseLong(getAttributeValue(lastCSUpdateLine, "time:"));
245             System.out.println("Last CS Update:" + lastCSUpdateTimestamp);
246 
247             System.out.println("state transition latency: "
248                 + +(lastCSUpdateTimestamp - lastTestStartTimestamp) + "ms");
249 
250             System.out.println("state transition latency since controller start: "
251                 + +(lastCSUpdateTimestamp - controllerStartTime) + "ms");
252 
253             System.out.println("Create MSG\t" + stats.msgSentCount + "\t"
254                 + stats.msgSentCount_O2S + "\t" + stats.msgSentCount_S2M + "\t"
255                 + stats.msgSentCount_M2S);
256             System.out.println("Modify MSG\t" + stats.msgModifyCount);
257             System.out.println("Delete MSG\t" + stats.msgDeleteCount);
258             System.out.println("Create CS\t" + stats.curStateCreateCount);
259             System.out.println("Update CS\t" + stats.curStateUpdateCount);
260             System.out.println("Create EV\t" + stats.extViewCreateCount);
261             System.out.println("Update EV\t" + stats.extViewUpdateCount);
262 
263             System.out.println();
264             stats = new Stats();
265             lastTestStartTimestamp = Long.MAX_VALUE;
266           }
267         }
268         else if (inputLine.indexOf("/" + clusterName + "/LIVEINSTANCES/") != -1)
269         {
270           // cluster startup
271           if (timestampVal < lastTestStartTimestamp)
272           {
273             System.out.println("START cluster. SETTING lastTestStartTimestamp to "
274                 + new Timestamp(timestampVal) + "\nline:" + inputLine);
275             lastTestStartTimestamp = timestampVal;
276           }
277 
278           ZNRecord record = getZNRecord(inputLine);
279           LiveInstance liveInstance = new LiveInstance(record);
280           String session = getAttributeValue(inputLine, "session:");
281           sessionMap.put(session, inputLine);
282           System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LIVEINSTANCE "
283               + liveInstance.getInstanceName());
284         }
285         else if (inputLine.indexOf("closeSession") != -1)
286         {
287           // kill any instance
288           String session = getAttributeValue(inputLine, "session:");
289           if (sessionMap.containsKey(session))
290           {
291             if (timestampVal < lastTestStartTimestamp)
292             {
293               System.out.println("KILL node. SETTING lastTestStartTimestamp to " + timestampVal
294                   + " line:" + inputLine);
295               lastTestStartTimestamp = timestampVal;
296             }
297             String line = sessionMap.get(session);
298             ZNRecord record = getZNRecord(line);
299             LiveInstance liveInstance = new LiveInstance(record);
300 
301             System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": close session "
302                 + liveInstance.getInstanceName());
303             dump = true;
304           }
305         }
306         else if (inputLine.indexOf("/" + clusterName + "/CONFIGS/PARTICIPANT") != -1)
307         {
308           // disable a partition
309           String type = getAttributeValue(inputLine, "type:");
310           if (type.equals("setData") && inputLine.indexOf("HELIX_DISABLED_PARTITION") != -1)
311           {
312             if (timestampVal < lastTestStartTimestamp)
313             {
314               System.out.println("DISABLE partition. SETTING lastTestStartTimestamp to " + timestampVal
315                   + " line:" + inputLine);
316               lastTestStartTimestamp = timestampVal;
317             }
318           }
319         } else if (inputLine.indexOf("/" + clusterName + "/CONTROLLER/LEADER") != -1)
320         {
321           // leaderLine = inputLine;
322           ZNRecord record = getZNRecord(inputLine);
323           LiveInstance liveInstance = new LiveInstance(record);
324           String session = getAttributeValue(inputLine, "session:");
325           leaderSession = session;
326           controllerStartTime = Long.parseLong(getAttributeValue(inputLine, "time:"));
327           sessionMap.put(session, inputLine);
328           System.out.println(new Timestamp(Long.parseLong(timestamp)) + ": create LEADER "
329               + liveInstance.getInstanceName());
330         }
331         else if (inputLine.indexOf("/" + clusterName + "/") != -1
332             && inputLine.indexOf("/CURRENTSTATES/") != -1)
333         {
334           String type = getAttributeValue(inputLine, "type:");
335           if (type.equals("create"))
336           {
337             stats.curStateCreateCount++;
338           }
339           else if (type.equals("setData"))
340           {
341             String path = getAttributeValue(inputLine, "path:");
342             csUpdateLines.add(inputLine);
343             stats.curStateUpdateCount++;
344             // getAttributeValue(line, "data");
345             System.out.println("Update currentstate:"
346                 + new Timestamp(Long.parseLong(timestamp)) + ":" + timestamp + " path:"
347                 + path);
348           }
349         }
350         else if (inputLine.indexOf("/" + clusterName + "/EXTERNALVIEW/") != -1)
351         {
352           String session = getAttributeValue(inputLine, "session:");
353           if (session.equals(leaderSession))
354           {
355             String type = getAttributeValue(inputLine, "type:");
356             if (type.equals("create"))
357             {
358               stats.extViewCreateCount++;
359             }
360             else if (type.equals("setData"))
361             {
362               stats.extViewUpdateCount++;
363             }
364           }
365 
366           // pos = inputLine.indexOf("EXTERNALVIEW");
367           // pos = inputLine.indexOf("data:{", pos);
368           // if (pos != -1)
369           // {
370           // String timestamp = getAttributeValue(inputLine, "time:");
371           // ZNRecord record =
372           // (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
373           // .getBytes());
374           // ExternalView extView = new ExternalView(record);
375           // int masterCnt = ClusterStateVerifier.countStateNbInExtView(extView,
376           // "MASTER");
377           // int slaveCnt = ClusterStateVerifier.countStateNbInExtView(extView, "SLAVE");
378           // if (masterCnt == 1200)
379           // {
380           // System.out.println(timestamp + ": externalView " + extView.getResourceName()
381           // + " has " + masterCnt + " MASTER, " + slaveCnt + " SLAVE");
382           // }
383           // }
384         }
385         else if (inputLine.indexOf("/" + clusterName + "/") != -1
386             && inputLine.indexOf("/MESSAGES/") != -1)
387         {
388           String type = getAttributeValue(inputLine, "type:");
389 
390           if (type.equals("create"))
391           {
392             ZNRecord record = getZNRecord(inputLine);
393             Message msg = new Message(record);
394             String sendSession = getAttributeValue(inputLine, "session:");
395             if (sendSession.equals(leaderSession)
396                 && msg.getMsgType().equals("STATE_TRANSITION")
397                 && msg.getMsgState() == MessageState.NEW)
398             {
399               // sendMessageLines.add(inputLine);
400               stats.msgSentCount++;
401 
402               if (msg.getFromState().equals("OFFLINE")
403                   && msg.getToState().equals("SLAVE"))
404               {
405                 stats.msgSentCount_O2S++;
406               }
407               else if (msg.getFromState().equals("SLAVE")
408                   && msg.getToState().equals("MASTER"))
409               {
410                 stats.msgSentCount_S2M++;
411               }
412               else if (msg.getFromState().equals("MASTER")
413                   && msg.getToState().equals("SLAVE"))
414               {
415                 stats.msgSentCount_M2S++;
416               }
417               // System.out.println("Message create:"+new
418               // Timestamp(Long.parseLong(timestamp)));
419             }
420 
421             // pos = inputLine.indexOf("MESSAGES");
422             // pos = inputLine.indexOf("data:{", pos);
423             // if (pos != -1)
424             // {
425             //
426             // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
427             // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
428             // Message msg = new Message(record);
429             // MessageState msgState = msg.getMsgState();
430             // String msgType = msg.getMsgType();
431             // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.NEW)
432             // {
433             // if (!msgs.containsKey(msg.getMsgId()))
434             // {
435             // msgs.put(msg.getMsgId(), new MsgItem(Long.parseLong(timestamp), msg));
436             // }
437             // else
438             // {
439             // LOG.error("msg: " + msg.getMsgId() + " already sent");
440             // }
441             //
442             // System.out.println(timestamp + ": sendMsg " + msg.getPartitionName() + "("
443             // + msg.getFromState() + "->" + msg.getToState() + ") to "
444             // + msg.getTgtName() + ", size: " + msgBytes.length);
445             // }
446             // }
447           }
448           else if (type.equals("setData"))
449           {
450             stats.msgModifyCount++;
451             // pos = inputLine.indexOf("MESSAGES");
452             // pos = inputLine.indexOf("data:{", pos);
453             // if (pos != -1)
454             // {
455             //
456             // byte[] msgBytes = inputLine.substring(pos + 5).getBytes();
457             // ZNRecord record = (ZNRecord) _deserializer.deserialize(msgBytes);
458             // Message msg = new Message(record);
459             // MessageState msgState = msg.getMsgState();
460             // String msgType = msg.getMsgType();
461             // if (msgType.equals("STATE_TRANSITION") && msgState == MessageState.READ)
462             // {
463             // if (!msgs.containsKey(msg.getMsgId()))
464             // {
465             // LOG.error("msg: " + msg.getMsgId() + " never sent");
466             // }
467             // else
468             // {
469             // MsgItem msgItem = msgs.get(msg.getMsgId());
470             // if (msgItem.readTime == 0)
471             // {
472             // msgItem.readTime = Long.parseLong(timestamp);
473             // msgs.put(msg.getMsgId(), msgItem);
474             // // System.out.println(timestamp + ": readMsg " + msg.getPartitionName()
475             // // + "("
476             // // + msg.getFromState() + "->" + msg.getToState() + ") to "
477             // // + msg.getTgtName() + ", latency: " + (msgItem.readTime -
478             // // msgItem.sendTime));
479             // }
480             // }
481             //
482             // }
483             // }
484           }
485           else if (type.equals("delete"))
486           {
487             stats.msgDeleteCount++;
488             // String msgId = path.substring(path.lastIndexOf('/') + 1);
489             // if (msgs.containsKey(msgId))
490             // {
491             // MsgItem msgItem = msgs.get(msgId);
492             // Message msg = msgItem.msg;
493             // msgItem.deleteTime = Long.parseLong(timestamp);
494             // msgs.put(msgId, msgItem);
495             // msgItem.latency = msgItem.deleteTime - msgItem.sendTime;
496             // System.out.println(timestamp + ": delMsg " + msg.getPartitionName() + "("
497             // + msg.getFromState() + "->" + msg.getToState() + ") to "
498             // + msg.getTgtName() + ", latency: " + msgItem.latency);
499             // }
500             // else
501             // {
502             // // messages other than STATE_TRANSITION message
503             // // LOG.error("msg: " + msgId + " never sent");
504             // }
505           }
506         }
507       } // end of [br.readLine()) != null]
508     }
509   }
510 }