1 package org.apache.helix.messaging;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27
28 import org.apache.helix.model.Message;
29 import org.apache.log4j.Logger;
30
31
32 public abstract class AsyncCallback
33 {
34
35 private static Logger _logger = Logger.getLogger(AsyncCallback.class);
36 long _startTimeStamp = 0;
37 protected long _timeout = -1;
38 Timer _timer = null;
39 List<Message> _messagesSent;
40 protected final List<Message> _messageReplied = new ArrayList<Message>();
41 boolean _timedOut = false;
42 boolean _isInterrupted = false;
43
44
45
46
47
48
49 public AsyncCallback(long timeout)
50 {
51 _logger.info("Setting time out to " + timeout + " ms");
52 _timeout = timeout;
53 }
54
55 public AsyncCallback()
56 {
57 this(-1);
58 }
59
60 public final void setTimeout(long timeout)
61 {
62 _logger.info("Setting time out to " + timeout + " ms");
63 _timeout = timeout;
64
65 }
66
67 public List<Message> getMessageReplied()
68 {
69 return _messageReplied;
70 }
71
72 public boolean isInterrupted()
73 {
74 return _isInterrupted;
75 }
76
77 public void setInterrupted(boolean b)
78 {
79 _isInterrupted = true;
80 }
81
82 public synchronized final void onReply(Message message)
83 {
84 _logger.info("OnReply msg " + message.getMsgId());
85 if (!isDone())
86 {
87 _messageReplied.add(message);
88 try
89 {
90 onReplyMessage(message);
91 }
92 catch(Exception e)
93 {
94 _logger.error(e);
95 }
96 }
97 if (isDone())
98 {
99 if(_timer != null)
100 {
101 _timer.cancel();
102 }
103 notifyAll();
104 }
105 }
106
107
108
109
110
111
112 public boolean isDone()
113 {
114 return _messageReplied.size() == _messagesSent.size();
115 }
116
117 public boolean isTimedOut()
118 {
119 return _timedOut;
120 }
121
122 final void setMessagesSent(List<Message> generatedMessage)
123 {
124 _messagesSent = generatedMessage;
125 }
126
127 final void startTimer()
128 {
129 if (_timer == null && _timeout > 0)
130 {
131 if (_startTimeStamp == 0)
132 {
133 _startTimeStamp = new Date().getTime();
134 }
135 _timer = new Timer(true);
136 _timer.schedule(new TimeoutTask(this), _timeout);
137 }
138 }
139
140 public abstract void onTimeOut();
141
142 public abstract void onReplyMessage(Message message);
143
144 class TimeoutTask extends TimerTask
145 {
146 AsyncCallback _callback;
147
148 public TimeoutTask(AsyncCallback asyncCallback)
149 {
150 _callback = asyncCallback;
151 }
152
153 @Override
154 public void run()
155 {
156 try
157 {
158 synchronized (_callback)
159 {
160 _callback._timedOut = true;
161 _callback.notifyAll();
162 _callback.onTimeOut();
163 }
164 }
165 catch (Exception e)
166 {
167 _logger.error(e);
168 }
169 finally
170 {
171 if(_timer != null)
172 {
173 _timer.cancel();
174 }
175 }
176 }
177 }
178
179 }