1 package org.apache.helix.messaging;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.apache.helix.messaging.AsyncCallback;
28 import org.apache.helix.model.Message;
29 import org.testng.AssertJUnit;
30 import org.testng.annotations.Test;
31
32
33 public class TestAsyncCallback
34 {
35 class AsyncCallbackSample extends AsyncCallback
36 {
37 int _onTimeOutCalled = 0;
38 int _onReplyMessageCalled = 0;
39 @Override
40 public void onTimeOut()
41 {
42
43 _onTimeOutCalled ++;
44 }
45
46 @Override
47 public void onReplyMessage(Message message)
48 {
49 _onReplyMessageCalled++;
50 }
51 }
52
53 @Test()
54 public void testAsyncCallback() throws Exception
55 {
56 System.out.println("START TestAsyncCallback at " + new Date(System.currentTimeMillis()));
57 AsyncCallbackSample callback = new AsyncCallbackSample();
58 AssertJUnit.assertFalse(callback.isInterrupted());
59 AssertJUnit.assertFalse(callback.isTimedOut());
60 AssertJUnit.assertTrue(callback.getMessageReplied().size() == 0);
61
62 int nMsgs = 5;
63
64 List<Message> messageSent = new ArrayList<Message>();
65 for(int i = 0;i < nMsgs; i++)
66 {
67 messageSent.add(new Message("Test", UUID.randomUUID().toString()));
68 }
69
70 callback.setMessagesSent(messageSent);
71
72 for(int i = 0;i < nMsgs; i++)
73 {
74 AssertJUnit.assertFalse(callback.isDone());
75 callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
76 }
77 AssertJUnit.assertTrue(callback.isDone());
78
79 AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
80
81 sleep(50);
82 callback = new AsyncCallbackSample();
83 callback.setMessagesSent(messageSent);
84 callback.setTimeout(1000);
85 sleep(50);
86 callback.startTimer();
87 AssertJUnit.assertFalse(callback.isTimedOut());
88 for(int i = 0;i < nMsgs - 1; i++)
89 {
90 sleep(50);
91 AssertJUnit.assertFalse(callback.isDone());
92 AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
93 callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
94 }
95 sleep(1000);
96 AssertJUnit.assertTrue(callback.isTimedOut());
97 AssertJUnit.assertTrue(callback._onTimeOutCalled == 1 );
98 AssertJUnit.assertFalse(callback.isDone());
99
100 callback = new AsyncCallbackSample();
101 callback.setMessagesSent(messageSent);
102 callback.setTimeout(1000);
103 callback.startTimer();
104 sleep(50);
105 AssertJUnit.assertFalse(callback.isTimedOut());
106 for(int i = 0;i < nMsgs; i++)
107 {
108 AssertJUnit.assertFalse(callback.isDone());
109 sleep(50);
110 AssertJUnit.assertTrue(callback._onReplyMessageCalled == i);
111 callback.onReply(new Message("TestReply", UUID.randomUUID().toString()));
112 }
113 AssertJUnit.assertTrue(callback.isDone());
114 sleep(1300);
115 AssertJUnit.assertFalse(callback.isTimedOut());
116 AssertJUnit.assertTrue(callback._onTimeOutCalled == 0 );
117 System.out.println("END TestAsyncCallback at " + new Date(System.currentTimeMillis()));
118 }
119
120 void sleep(int time)
121 {
122 try
123 {
124 Thread.sleep(time);
125 }
126 catch(Exception e)
127 {
128 System.out.println(e);
129 }
130 }
131 }