View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedReader;
23  import java.io.BufferedWriter;
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.IOException;
29  import java.io.InputStreamReader;
30  import java.io.OutputStreamWriter;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.manager.zk.ZNRecordSerializer;
37  import org.apache.helix.model.IdealState.IdealStateProperty;
38  import org.apache.helix.util.HelixUtil;
39  
40  
41  public class ZkLogCSVFormatter
42  {
43    private static final ZNRecordSerializer _deserializer = new ZNRecordSerializer();
44    private static String _fieldDelim = ",";
45  
46    /**
47     * @param args
48     */
49    public static void main(String[] args) throws Exception
50    {
51      if (args.length != 2)
52      {
53        System.err.println("USAGE: ZkLogCSVFormatter log_file output_dir");
54        System.exit(2);
55      }
56      File outputDir = new File(args[1]);
57      if (!outputDir.exists() || !outputDir.isDirectory())
58      {
59        System.err.println(outputDir.getAbsolutePath() + " does NOT exist or is NOT a directory");
60        System.exit(2);
61      }
62      format(args[0], args[1]);
63    }
64  
65    private static void formatter(BufferedWriter bw, String... args)
66    {
67      StringBuffer sb = new StringBuffer();
68  
69      if (args.length == 0)
70      {
71        return;
72      }
73      else
74      {
75        sb.append(args[0]);
76        for (int i = 1; i < args.length; i++)
77        {
78          sb.append(_fieldDelim).append(args[i]);
79        }
80      }
81  
82      try
83      {
84        bw.write(sb.toString());
85        bw.newLine();
86        // System.out.println(sb.toString());
87      }
88      catch (IOException e)
89      {
90        // TODO Auto-generated catch block
91        e.printStackTrace();
92      }
93    }
94  
95    private static String getAttributeValue(String line, String attribute)
96    {
97      String[] parts = line.split("\\s");
98      if (parts != null && parts.length > 0)
99      {
100       for (int i = 0; i < parts.length; i++)
101       {
102         if (parts[i].startsWith(attribute))
103         {
104           String val = parts[i].substring(attribute.length());
105           return val;
106         }
107       }
108     }
109     return null;
110   }
111 
112   private static void format(String logfilepath, String outputDir) throws FileNotFoundException
113   {
114     try
115     {
116       // input file
117       FileInputStream fis = new FileInputStream(logfilepath);
118       BufferedReader br = new BufferedReader(new InputStreamReader(fis));
119 
120       // output files
121       FileOutputStream isFos = new FileOutputStream(outputDir + "/" + "idealState.csv");
122       BufferedWriter isBw = new BufferedWriter(new OutputStreamWriter(isFos));
123 
124       FileOutputStream cfgFos = new FileOutputStream(outputDir + "/" + "config.csv");
125       BufferedWriter cfgBw = new BufferedWriter(new OutputStreamWriter(cfgFos));
126 
127       FileOutputStream evFos = new FileOutputStream(outputDir + "/" + "externalView.csv");
128       BufferedWriter evBw = new BufferedWriter(new OutputStreamWriter(evFos));
129 
130       FileOutputStream smdCntFos =
131           new FileOutputStream(outputDir + "/" + "stateModelDefStateCount.csv");
132       BufferedWriter smdCntBw = new BufferedWriter(new OutputStreamWriter(smdCntFos));
133 
134       FileOutputStream smdNextFos =
135           new FileOutputStream(outputDir + "/" + "stateModelDefStateNext.csv");
136       BufferedWriter smdNextBw = new BufferedWriter(new OutputStreamWriter(smdNextFos));
137 
138       FileOutputStream csFos = new FileOutputStream(outputDir + "/" + "currentState.csv");
139       BufferedWriter csBw = new BufferedWriter(new OutputStreamWriter(csFos));
140 
141       FileOutputStream msgFos = new FileOutputStream(outputDir + "/" + "messages.csv");
142       BufferedWriter msgBw = new BufferedWriter(new OutputStreamWriter(msgFos));
143 
144       FileOutputStream hrPerfFos = new FileOutputStream(outputDir + "/" + "healthReportDefaultPerfCounters.csv");
145       BufferedWriter hrPerfBw = new BufferedWriter(new OutputStreamWriter(hrPerfFos));
146 
147       FileOutputStream liFos =
148           new FileOutputStream(outputDir + "/" + "liveInstances.csv");
149       BufferedWriter liBw = new BufferedWriter(new OutputStreamWriter(liFos));
150 
151       formatter(cfgBw, "timestamp", "instanceName", "host", "port", "enabled");
152       formatter(isBw,
153                 "timestamp",
154                 "resourceName",
155                 "partitionNumber",
156                 "mode",
157                 "partition",
158                 "instanceName",
159                 "priority");
160       formatter(evBw, "timestamp", "resourceName", "partition", "instanceName", "state");
161       formatter(smdCntBw, "timestamp", "stateModel", "state", "count");
162       formatter(smdNextBw, "timestamp", "stateModel", "from", "to", "next");
163       formatter(liBw, "timestamp", "instanceName", "sessionId", "Operation");
164       formatter(csBw,
165                 "timestamp",
166                 "resourceName",
167                 "partition",
168                 "instanceName",
169                 "sessionId",
170                 "state");
171       formatter(msgBw,
172                 "timestamp",
173                 "resourceName",
174                 "partition",
175                 "instanceName",
176                 "sessionId",
177                 "from",
178                 "to",
179                 "messageType",
180                 "messageState");
181       formatter(hrPerfBw,
182                 "timestamp",
183                 "instanceName",
184                 "availableCPUs",
185                 "averageSystemLoad",
186                 "freeJvmMemory",
187                 "freePhysicalMemory",
188                 "totalJvmMemory");
189 
190       Map<String, ZNRecord> liveInstanceSessionMap = new HashMap<String, ZNRecord>();
191 
192       int pos;
193       String inputLine;
194       while ((inputLine = br.readLine()) != null)
195       {
196         if (inputLine.indexOf("CONFIGS") != -1)
197         {
198           pos = inputLine.indexOf("CONFIGS");
199           pos = inputLine.indexOf("data:{", pos);
200           if (pos != -1)
201           {
202             String timestamp = getAttributeValue(inputLine, "time:");
203             ZNRecord record =
204                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
205                                                               .getBytes());
206 
207             formatter(cfgBw,
208                       timestamp,
209                       record.getId(),
210                       record.getSimpleField("HOST"),
211                       record.getSimpleField("PORT"),
212                       record.getSimpleField("ENABLED"));
213 
214           }
215         }
216         else if (inputLine.indexOf("IDEALSTATES") != -1)
217         {
218           pos = inputLine.indexOf("IDEALSTATES");
219           pos = inputLine.indexOf("data:{", pos);
220           if (pos != -1)
221           {
222             String timestamp = getAttributeValue(inputLine, "time:");
223             ZNRecord record =
224                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
225                                                               .getBytes());
226             // System.out.println("record=" + record);
227             for (String partition : record.getListFields().keySet())
228             {
229               List<String> preferenceList = record.getListFields().get(partition);
230               for (int i = 0; i < preferenceList.size(); i++)
231               {
232                 String instance = preferenceList.get(i);
233                 formatter(isBw,
234                           timestamp,
235                           record.getId(),
236                           record.getSimpleField(IdealStateProperty.NUM_PARTITIONS.toString()),
237                           record.getSimpleField(IdealStateProperty.IDEAL_STATE_MODE.toString()),
238                           partition,
239                           instance,
240                           Integer.toString(i));
241               }
242             }
243           }
244         }
245         else if (inputLine.indexOf("LIVEINSTANCES") != -1)
246         {
247           pos = inputLine.indexOf("LIVEINSTANCES");
248           pos = inputLine.indexOf("data:{", pos);
249           if (pos != -1)
250           {
251             String timestamp = getAttributeValue(inputLine, "time:");
252             ZNRecord record =
253                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
254                                                               .getBytes());
255             formatter(liBw, timestamp, record.getId(), record.getSimpleField("SESSION_ID"), "ADD");
256             String zkSessionId = getAttributeValue(inputLine, "session:");
257             if (zkSessionId == null)
258             {
259               System.err.println("no zk session id associated with the adding of live instance: "
260                   + inputLine);
261             }
262             else
263             {
264               liveInstanceSessionMap.put(zkSessionId, record);
265             }
266           }
267 
268         }
269         else if (inputLine.indexOf("EXTERNALVIEW") != -1)
270         {
271           pos = inputLine.indexOf("EXTERNALVIEW");
272           pos = inputLine.indexOf("data:{", pos);
273           if (pos != -1)
274           {
275             String timestamp = getAttributeValue(inputLine, "time:");
276             ZNRecord record =
277                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
278                                                               .getBytes());
279             // System.out.println("record=" + record);
280             for (String partition : record.getMapFields().keySet())
281             {
282               Map<String, String> stateMap = record.getMapFields().get(partition);
283               for (String instance : stateMap.keySet())
284               {
285                 String state = stateMap.get(instance);
286                 formatter(evBw, timestamp, record.getId(), partition, instance, state);
287               }
288             }
289           }
290         }
291         else if (inputLine.indexOf("STATEMODELDEFS") != -1)
292         {
293           pos = inputLine.indexOf("STATEMODELDEFS");
294           pos = inputLine.indexOf("data:{", pos);
295           if (pos != -1)
296           {
297             String timestamp = getAttributeValue(inputLine, "time:");
298             ZNRecord record =
299                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
300                                                               .getBytes());
301 
302             for (String stateInfo : record.getMapFields().keySet())
303             {
304               if (stateInfo.endsWith(".meta"))
305               {
306                 Map<String, String> metaMap = record.getMapFields().get(stateInfo);
307                 formatter(smdCntBw,
308                           timestamp,
309                           record.getId(),
310                           stateInfo.substring(0, stateInfo.indexOf('.')),
311                           metaMap.get("count"));
312               }
313               else if (stateInfo.endsWith(".next"))
314               {
315                 Map<String, String> nextMap = record.getMapFields().get(stateInfo);
316                 for (String destState : nextMap.keySet())
317                 {
318                   formatter(smdNextBw,
319                             timestamp,
320                             record.getId(),
321                             stateInfo.substring(0, stateInfo.indexOf('.')),
322                             destState,
323                             nextMap.get(destState));
324                 }
325               }
326             }
327           }
328         }
329         else if (inputLine.indexOf("CURRENTSTATES") != -1)
330         {
331           pos = inputLine.indexOf("CURRENTSTATES");
332           pos = inputLine.indexOf("data:{", pos);
333           if (pos != -1)
334           {
335             String timestamp = getAttributeValue(inputLine, "time:");
336             ZNRecord record =
337                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
338                                                               .getBytes());
339             // System.out.println("record=" + record);
340             for (String partition : record.getMapFields().keySet())
341             {
342               Map<String, String> stateMap = record.getMapFields().get(partition);
343               String path = getAttributeValue(inputLine, "path:");
344               if (path != null)
345               {
346                 String instance = HelixUtil.getInstanceNameFromPath(path);
347                 formatter(csBw,
348                           timestamp,
349                           record.getId(),
350                           partition,
351                           instance,
352                           record.getSimpleField("SESSION_ID"),
353                           stateMap.get("CURRENT_STATE"));
354               }
355             }
356           }
357         }
358         else if (inputLine.indexOf("MESSAGES") != -1)
359         {
360           pos = inputLine.indexOf("MESSAGES");
361           pos = inputLine.indexOf("data:{", pos);
362           if (pos != -1)
363           {
364             String timestamp = getAttributeValue(inputLine, "time:");
365             ZNRecord record =
366                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
367                                                               .getBytes());
368 
369             formatter(msgBw,
370                       timestamp,
371                       record.getSimpleField("RESOURCE_NAME"),
372                       record.getSimpleField("PARTITION_NAME"),
373                       record.getSimpleField("TGT_NAME"),
374                       record.getSimpleField("TGT_SESSION_ID"),
375                       record.getSimpleField("FROM_STATE"),
376                       record.getSimpleField("TO_STATE"),
377                       record.getSimpleField("MSG_TYPE"),
378                       record.getSimpleField("MSG_STATE"));
379           }
380 
381         }
382         else if (inputLine.indexOf("closeSession") != -1)
383         {
384           String zkSessionId = getAttributeValue(inputLine, "session:");
385           if (zkSessionId == null)
386           {
387             System.err.println("no zk session id associated with the closing of zk session: "
388                 + inputLine);
389           }
390           else
391           {
392             ZNRecord record = liveInstanceSessionMap.remove(zkSessionId);
393             // System.err.println("zkSessionId:" + zkSessionId + ", record:" + record);
394             if (record != null)
395             {
396               String timestamp = getAttributeValue(inputLine, "time:");
397               formatter(liBw,
398                         timestamp,
399                         record.getId(),
400                         record.getSimpleField("SESSION_ID"),
401                         "DELETE");
402             }
403           }
404         }
405         else if (inputLine.indexOf("HEALTHREPORT/defaultPerfCounters") != -1)
406         {
407           pos = inputLine.indexOf("HEALTHREPORT/defaultPerfCounters");
408           pos = inputLine.indexOf("data:{", pos);
409           if (pos != -1)
410           {
411             String timestamp = getAttributeValue(inputLine, "time:");
412             ZNRecord record =
413                 (ZNRecord) _deserializer.deserialize(inputLine.substring(pos + 5)
414                                                               .getBytes());
415 
416             String path = getAttributeValue(inputLine, "path:");
417             if (path != null)
418             {
419               String instance = HelixUtil.getInstanceNameFromPath(path);
420               formatter(hrPerfBw,
421                         timestamp,
422                         instance,
423                         record.getSimpleField("availableCPUs"),
424                         record.getSimpleField("averageSystemLoad"),
425                         record.getSimpleField("freeJvmMemory"),
426                         record.getSimpleField("freePhysicalMemory"),
427                         record.getSimpleField("totalJvmMemory"));
428             }
429           }
430         }
431       }
432 
433       br.close();
434       isBw.close();
435       cfgBw.close();
436       evBw.close();
437       smdCntBw.close();
438       smdNextBw.close();
439       csBw.close();
440       msgBw.close();
441       liBw.close();
442       hrPerfBw.close();
443     }
444     catch (Exception e)
445     {
446       System.err.println("Error: " + e.getMessage());
447     }
448   }
449 }