1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.LinkedList;
23 import java.util.List;
24
25 import org.I0Itec.zkclient.IZkStateListener;
26 import org.I0Itec.zkclient.ZkConnection;
27 import org.apache.log4j.Logger;
28 import org.apache.zookeeper.Watcher.Event.KeeperState;
29
30 public class ZkStateChangeListener implements IZkStateListener
31 {
32 private volatile boolean _isConnected;
33 private volatile boolean _hasSessionExpired;
34 private final ZKHelixManager _zkHelixManager;
35
36
37
38
39 List<Long> _disconnectTimeHistory = new LinkedList<Long>();
40 int _timeWindowLengthMs;
41 int _maxDisconnectThreshold;
42
43 private static Logger logger = Logger.getLogger(ZkStateChangeListener.class);
44
45 public ZkStateChangeListener(ZKHelixManager zkHelixManager, int timeWindowLengthMs, int maxDisconnectThreshold)
46 {
47 this._zkHelixManager = zkHelixManager;
48 _timeWindowLengthMs = timeWindowLengthMs;
49
50
51 _maxDisconnectThreshold = maxDisconnectThreshold > 0 ? maxDisconnectThreshold : 1;
52 }
53
54 @Override
55 public void handleNewSession()
56 {
57
58
59
60 _isConnected = true;
61 _hasSessionExpired = false;
62 _zkHelixManager.handleNewSession();
63 }
64
65 @Override
66 public void handleStateChanged(KeeperState keeperState) throws Exception
67 {
68 switch (keeperState)
69 {
70 case SyncConnected:
71 ZkConnection zkConnection =
72 ((ZkConnection) _zkHelixManager._zkClient.getConnection());
73 logger.info("KeeperState: " + keeperState + ", zookeeper:" + zkConnection.getZookeeper());
74 _isConnected = true;
75 break;
76 case Disconnected:
77 logger.info("KeeperState:" + keeperState + ", disconnectedSessionId: "
78 + _zkHelixManager._sessionId + ", instance: "
79 + _zkHelixManager.getInstanceName() + ", type: "
80 + _zkHelixManager.getInstanceType());
81
82 _isConnected = false;
83
84
85 _disconnectTimeHistory.add(System.currentTimeMillis());
86 if(isFlapping())
87 {
88 logger.error("isFlapping() returns true, so disconnect the helix manager. " + _zkHelixManager.getInstanceName() + " "
89 + _maxDisconnectThreshold + " disconnects in " + _timeWindowLengthMs + " Ms.");
90 _zkHelixManager.disconnectInternal();
91 }
92 break;
93 case Expired:
94 logger.info("KeeperState:" + keeperState + ", expiredSessionId: "
95 + _zkHelixManager._sessionId + ", instance: "
96 + _zkHelixManager.getInstanceName() + ", type: "
97 + _zkHelixManager.getInstanceType());
98
99 _isConnected = false;
100 _hasSessionExpired = true;
101 break;
102 }
103 }
104
105 boolean isConnected()
106 {
107 return _isConnected;
108 }
109
110 void disconnect()
111 {
112 _isConnected = false;
113 }
114
115 boolean hasSessionExpired()
116 {
117 return _hasSessionExpired;
118 }
119
120
121
122
123
124 boolean isFlapping()
125 {
126 if(_disconnectTimeHistory.size() == 0)
127 {
128 return false;
129 }
130 long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
131
132 while((_disconnectTimeHistory.get(0) + _timeWindowLengthMs) < mostRecentTimestamp)
133 {
134 _disconnectTimeHistory.remove(0);
135 }
136 return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
137 }
138 }