View Javadoc

1   package org.apache.helix.messaging;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.UUID;
26  
27  import org.apache.helix.messaging.AsyncCallback;
28  import org.apache.helix.model.Message;
29  import org.testng.AssertJUnit;
30  import org.testng.annotations.Test;
31  
32  
33  public class TestAsyncCallback
34  {
35    class AsyncCallbackSample extends AsyncCallback
36    {
37      int _onTimeOutCalled = 0;
38      int _onReplyMessageCalled = 0;
39      @Override
40      public void onTimeOut()
41      {
42        // TODO Auto-generated method stub
43        _onTimeOutCalled ++;
44      }
45  
46      @Override
47      public void onReplyMessage(Message message)
48      {
49        _onReplyMessageCalled++;
50      }
51    }
52  
53    @Test()
54    public void testAsyncCallback() throws Exception
55    {
56      System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
57      AsyncCallbackSample callback = new AsyncCallbackSample();
58      AssertJUnit.assertFalse(callback.isInterrupted());
59      AssertJUnit.assertFalse(callback.isTimedOut());
60      AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
61  
62      int nMsgs = 5;
63  
64      List<Message> messageSent = new ArrayList<Message>();
65      for(int i = 0;i < nMsgs; i++)
66      {
67        messageSent.add(new Message("Test", UUID.randomUUID().toString()));
68      }
69  
70      callback.setMessagesSent(messageSent);
71  
72      for(int i = 0;i < nMsgs; i++)
73      {
74        AssertJUnit.assertFalse(callback.isDone());
75        callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
76      }
77      AssertJUnit.assertTrue(callback.isDone());
78  
79      AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
80  
81      sleep(50);
82      callback = new AsyncCallbackSample();
83      callback.setMessagesSent(messageSent);
84      callback.setTimeout(1000);
85      sleep(50);
86      callback.startTimer();
87      AssertJUnit.assertFalse(callback.isTimedOut());
88      for(int i = 0;i < nMsgs - 1; i++)
89      {
90        sleep(50);
91        AssertJUnit.assertFalse(callback.isDone());
92        AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
93        callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
94      }
95      sleep(1000);
96      AssertJUnit.assertTrue(callback.isTimedOut());
97      AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
98      AssertJUnit.assertFalse(callback.isDone());
99  
100     callback = new AsyncCallbackSample();
101     callback.setMessagesSent(messageSent);
102     callback.setTimeout(1000);
103     callback.startTimer();
104     sleep(50);
105     AssertJUnit.assertFalse(callback.isTimedOut());
106     for(int i = 0;i < nMsgs; i++)
107     {
108       AssertJUnit.assertFalse(callback.isDone());
109       sleep(50);
110       AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
111       callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
112     }
113     AssertJUnit.assertTrue(callback.isDone());
114     sleep(1300);
115     AssertJUnit.assertFalse(callback.isTimedOut());
116     AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
117     System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
118   }
119 
120   void sleep(int time)
121   {
122     try
123     {
124       Thread.sleep(time);
125     }
126     catch(Exception e)
127     {
128       System.out.println(e);
129     }
130   }
131 }