View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.LinkedList;
23  import java.util.List;
24  
25  import org.I0Itec.zkclient.IZkStateListener;
26  import org.I0Itec.zkclient.ZkConnection;
27  import org.apache.log4j.Logger;
28  import org.apache.zookeeper.Watcher.Event.KeeperState;
29  
30  public class ZkStateChangeListener implements IZkStateListener
31  {
32    private volatile boolean _isConnected;
33    private volatile boolean _hasSessionExpired;
34    private final ZKHelixManager _zkHelixManager;
35    
36    // Keep track of timestamps that zk State has become Disconnected
37    // If in a _timeWindowLengthMs window zk State has become Disconnected 
38    // for more than_maxDisconnectThreshold times disconnect the zkHelixManager
39    List<Long> _disconnectTimeHistory = new LinkedList<Long>();
40    int _timeWindowLengthMs;
41    int _maxDisconnectThreshold;
42  
43    private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
44  
45    public ZkStateChangeListener(ZKHelixManager zkHelixManager, int timeWindowLengthMs, int maxDisconnectThreshold)
46    {
47      this._zkHelixManager = zkHelixManager;
48      _timeWindowLengthMs = timeWindowLengthMs;
49      // _maxDisconnectThreshold min value is 1. 
50      // We don't want to disconnect from zk for the first time zkState become Disconnected
51      _maxDisconnectThreshold = maxDisconnectThreshold > 0 ? maxDisconnectThreshold : 1;
52    }
53  
54    @Override
55    public void handleNewSession()
56    {
57      // TODO:bug in zkclient .
58      // zkclient does not invoke handleStateChanged when a session expires but
59      // directly invokes handleNewSession
60      _isConnected = true;
61      _hasSessionExpired = false;
62      _zkHelixManager.handleNewSession();
63    }
64  
65    @Override
66    public void handleStateChanged(KeeperState keeperState) throws Exception
67    {
68      switch (keeperState)
69      {
70      case SyncConnected:
71        ZkConnection zkConnection =
72            ((ZkConnection) _zkHelixManager._zkClient.getConnection());
73        logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
74        _isConnected = true;
75        break;
76      case Disconnected:
77        logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
78            + _zkHelixManager._sessionId + ", instance: "
79            + _zkHelixManager.getInstanceName() + ", type: "
80            + _zkHelixManager.getInstanceType());
81  
82        _isConnected = false;
83        // Track the time stamp that the disconnected happens, then check history and see if
84        // we should disconnect the _zkHelixManager
85        _disconnectTimeHistory.add(System.currentTimeMillis());
86        if(isFlapping())
87        {
88          logger.error("isFlapping() returns true, so disconnect the helix manager. " + _zkHelixManager.getInstanceName() + " "
89            + _maxDisconnectThreshold + " disconnects in " + _timeWindowLengthMs + " Ms."); 
90          _zkHelixManager.disconnectInternal();
91        }
92        break;
93      case Expired:
94        logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
95            + _zkHelixManager._sessionId + ", instance: "
96            + _zkHelixManager.getInstanceName() + ", type: "
97            + _zkHelixManager.getInstanceType());
98  
99        _isConnected = false;
100       _hasSessionExpired = true;
101       break;
102     }
103   }
104 
105   boolean isConnected()
106   {
107     return _isConnected;
108   }
109 
110   void disconnect()
111   {
112     _isConnected = false;
113   }
114 
115   boolean hasSessionExpired()
116   {
117     return _hasSessionExpired;
118   }
119   
120   /**
121    * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous _timeWindowLengthMs Ms
122    * time window, we think that there are something wrong going on and disconnect the zkHelixManager from zk.
123    * */
124   boolean isFlapping()
125   {
126     if(_disconnectTimeHistory.size() == 0)
127     {
128       return false;
129     }
130     long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
131     // Remove disconnect history timestamp that are older than _timeWindowLengthMs ago
132     while((_disconnectTimeHistory.get(0) + _timeWindowLengthMs) < mostRecentTimestamp)
133     {
134       _disconnectTimeHistory.remove(0);
135     }
136     return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
137   }
138 }