View Javadoc

1   package org.apache.helix.monitoring;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.StringWriter;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.TimerTask;
26  
27  import org.apache.helix.HelixDataAccessor;
28  import org.apache.helix.HelixManager;
29  import org.apache.helix.PropertyType;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.manager.zk.ZNRecordSerializer;
33  import org.apache.helix.manager.zk.ZkClient;
34  import org.apache.helix.util.HelixUtil;
35  import org.apache.log4j.Logger;
36  import org.apache.zookeeper.data.Stat;
37  import org.codehaus.jackson.map.ObjectMapper;
38  import org.codehaus.jackson.map.SerializationConfig;
39  
40  
41  public class ZKPathDataDumpTask extends TimerTask
42  {
43    static Logger logger = Logger.getLogger(ZKPathDataDumpTask.class);
44  
45    private final int _thresholdNoChangeInMs;
46    private final HelixManager _manager;
47    private final ZkClient _zkClient;
48  
49    public ZKPathDataDumpTask(HelixManager manager, ZkClient zkClient, int thresholdNoChangeInMs)
50    {
51      _manager = manager;
52      _zkClient = zkClient;
53      logger.info("Scannning cluster statusUpdate " + manager.getClusterName()
54          + " thresholdNoChangeInMs: " + thresholdNoChangeInMs);
55      _thresholdNoChangeInMs = thresholdNoChangeInMs;
56    }
57  
58    @Override
59    public void run()
60    {
61      // For each record in status update and error node
62      // TODO: for now the status updates are dumped to cluster manager log4j log.
63      // We need to think if we should create per-instance log files that contains
64      // per-instance statusUpdates
65      // and errors
66      logger.info("Scannning status updates ...");
67      try
68      {
69        HelixDataAccessor accessor = _manager.getHelixDataAccessor();
70        Builder keyBuilder = accessor.keyBuilder();
71  
72        List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
73        for (String instanceName : instances)
74        {
75          scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
76              PropertyType.STATUSUPDATES), _thresholdNoChangeInMs);
77          scanPath(HelixUtil.getInstancePropertyPath(_manager.getClusterName(), instanceName,
78              PropertyType.ERRORS), _thresholdNoChangeInMs * 3);
79        }
80        scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
81            PropertyType.STATUSUPDATES_CONTROLLER), _thresholdNoChangeInMs);
82  
83        scanPath(HelixUtil.getControllerPropertyPath(_manager.getClusterName(),
84            PropertyType.ERRORS_CONTROLLER), _thresholdNoChangeInMs * 3);
85      } catch (Exception e)
86      {
87        logger.error(e);
88      }
89    }
90  
91    void scanPath(String path, int thresholdNoChangeInMs)
92    {
93      logger.info("Scannning path " + path);
94      List<String> subPaths = _zkClient.getChildren(path);
95      for (String subPath : subPaths)
96      {
97        try
98        {
99          String nextPath = path + "/" + subPath;
100         List<String> subSubPaths = _zkClient.getChildren(nextPath);
101         for (String subsubPath : subSubPaths)
102         {
103           try
104           {
105             checkAndDump(nextPath + "/" + subsubPath, thresholdNoChangeInMs);
106           } catch (Exception e)
107           {
108             logger.error(e);
109           }
110         }
111       } catch (Exception e)
112       {
113         logger.error(e);
114       }
115     }
116   }
117 
118   void checkAndDump(String path, int thresholdNoChangeInMs)
119   {
120     List<String> subPaths = _zkClient.getChildren(path);
121     if(subPaths.size() == 0)
122     {
123       subPaths.add("");
124     }
125     for (String subPath : subPaths)
126     {
127       String fullPath = subPath.length() > 0 ? path + "/" + subPath : path;
128       Stat pathStat = _zkClient.getStat(fullPath);
129 
130       long lastModifiedTimeInMs = pathStat.getMtime();
131       long nowInMs = new Date().getTime();
132       // logger.info(nowInMs + " " + lastModifiedTimeInMs + " " + fullPath);
133 
134       // Check the last modified time
135       if (nowInMs > lastModifiedTimeInMs)
136       {
137         long timeDiff = nowInMs - lastModifiedTimeInMs;
138         if (timeDiff > thresholdNoChangeInMs)
139         {
140           logger.info("Dumping status update path " + fullPath + " " + timeDiff + "MS has passed");
141           _zkClient.setZkSerializer(new ZNRecordSerializer());
142           ZNRecord record = _zkClient.readData(fullPath);
143 
144           // dump the node content into log file
145           ObjectMapper mapper = new ObjectMapper();
146           SerializationConfig serializationConfig = mapper.getSerializationConfig();
147           serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
148 
149           StringWriter sw = new StringWriter();
150           try
151           {
152             mapper.writeValue(sw, record);
153             logger.info(sw.toString());
154           } catch (Exception e)
155           {
156             logger.warn(
157                     "Exception during serialization in ZKPathDataDumpTask.checkAndDump. This can mostly be ignored",
158                     e);
159           }
160           // Delete the leaf data
161           _zkClient.deleteRecursive(fullPath);
162         }
163       }
164     }
165   }
166 }