View Javadoc

1   package org.apache.helix.messaging;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.Timer;
26  import java.util.TimerTask;
27  
28  import org.apache.helix.model.Message;
29  import org.apache.log4j.Logger;
30  
31  
32  public abstract class AsyncCallback
33  {
34  
35    private static Logger _logger = Logger.getLogger(AsyncCallback.class);
36    long _startTimeStamp = 0;
37    protected long _timeout = -1;
38    Timer _timer = null;
39    List<Message> _messagesSent;
40    protected final List<Message> _messageReplied = new ArrayList<Message>();
41    boolean _timedOut = false;
42    boolean _isInterrupted = false;
43  
44    /**
45     * Enforcing timeout to be set
46     * 
47     * @param timeout
48     */
49    public AsyncCallback(long timeout)
50    {
51      _logger.info("Setting time out to " + timeout + " ms");
52      _timeout = timeout;
53    }
54    
55    public AsyncCallback()
56    {
57      this(-1);
58    }
59  
60    public final void setTimeout(long timeout)
61    {
62      _logger.info("Setting time out to " + timeout + " ms");
63      _timeout = timeout;
64  
65    }
66  
67    public List<Message> getMessageReplied()
68    {
69      return _messageReplied;
70    }
71  
72    public boolean isInterrupted()
73    {
74      return _isInterrupted;
75    }
76  
77    public void setInterrupted(boolean b)
78    {
79      _isInterrupted = true;
80    }
81  
82    public synchronized final void onReply(Message message)
83    {
84      _logger.info("OnReply msg " + message.getMsgId());
85      if (!isDone())
86      {
87        _messageReplied.add(message);
88        try
89        {
90          onReplyMessage(message);
91        }
92        catch(Exception e) 
93        {
94          _logger.error(e);
95        }
96      }
97      if (isDone())
98      {
99        if(_timer != null)
100       {
101         _timer.cancel();
102       }
103       notifyAll();
104     }
105   }
106 
107   /**
108    * Default implementation will wait until every message sent gets a response
109    * 
110    * @return
111    */
112   public boolean isDone()
113   {
114     return _messageReplied.size() == _messagesSent.size();
115   }
116 
117   public boolean isTimedOut()
118   {
119     return _timedOut;
120   }
121 
122   final void setMessagesSent(List<Message> generatedMessage)
123   {
124     _messagesSent = generatedMessage;
125   }
126   
127   final void startTimer()
128   {
129     if (_timer == null && _timeout > 0)
130     {
131       if (_startTimeStamp == 0)
132       {
133         _startTimeStamp = new Date().getTime();
134       }
135       _timer = new Timer(true);
136       _timer.schedule(new TimeoutTask(this), _timeout);
137     }  
138   }
139   
140   public abstract void onTimeOut();
141 
142   public abstract void onReplyMessage(Message message);
143 
144   class TimeoutTask extends TimerTask
145   {
146     AsyncCallback _callback;
147 
148     public TimeoutTask(AsyncCallback asyncCallback)
149     {
150       _callback = asyncCallback;
151     }
152 
153     @Override
154     public void run()
155     {
156       try
157       {
158         synchronized (_callback)
159         {
160           _callback._timedOut = true;
161           _callback.notifyAll();
162           _callback.onTimeOut();
163         }
164       } 
165       catch (Exception e)
166       {
167         _logger.error(e);
168       }
169       finally
170       {
171         if(_timer != null)
172         {
173           _timer.cancel();
174         }
175       }
176     }
177   }
178 
179 }