1 package org.apache.helix.messaging.handling;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.UUID;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutorService;
27
28 import org.apache.helix.HelixException;
29 import org.apache.helix.HelixManager;
30 import org.apache.helix.Mocks;
31 import org.apache.helix.NotificationContext;
32 import org.apache.helix.messaging.handling.HelixTaskExecutor;
33 import org.apache.helix.messaging.handling.HelixTaskResult;
34 import org.apache.helix.messaging.handling.MessageHandler;
35 import org.apache.helix.messaging.handling.MessageHandlerFactory;
36 import org.apache.helix.model.Message;
37 import org.apache.helix.model.Message.MessageState;
38 import org.testng.Assert;
39 import org.testng.AssertJUnit;
40 import org.testng.annotations.Test;
41
42
43 public class TestHelixTaskExecutor
44 {
45 public static class MockClusterManager extends Mocks.MockManager
46 {
47 @Override
48 public String getSessionId()
49 {
50 return "123";
51 }
52 }
53
54 class TestMessageHandlerFactory implements MessageHandlerFactory
55 {
56 int _handlersCreated = 0;
57 ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
58 class TestMessageHandler extends MessageHandler
59 {
60 public TestMessageHandler(Message message, NotificationContext context)
61 {
62 super(message, context);
63
64 }
65
66 @Override
67 public HelixTaskResult handleMessage() throws InterruptedException
68 {
69 HelixTaskResult result = new HelixTaskResult();
70 _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
71 Thread.currentThread().sleep(100);
72 result.setSuccess(true);
73 return result;
74 }
75
76 @Override
77 public void onError(Exception e, ErrorCode code, ErrorType type)
78 {
79
80
81 }
82 }
83 @Override
84 public MessageHandler createHandler(Message message,
85 NotificationContext context)
86 {
87
88 if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
89 {
90 throw new HelixException("Test Message handler exception, can ignore");
91 }
92 _handlersCreated++;
93 return new TestMessageHandler(message, context);
94 }
95
96 @Override
97 public String getMessageType()
98 {
99
100 return "TestingMessageHandler";
101 }
102
103 @Override
104 public void reset()
105 {
106
107
108 }
109 }
110
111 class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
112 {
113 @Override
114 public String getMessageType()
115 {
116
117 return "TestingMessageHandler2";
118 }
119
120 }
121
122 class CancellableHandlerFactory implements MessageHandlerFactory
123 {
124
125 int _handlersCreated = 0;
126 ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
127 ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
128 ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
129 class CancellableHandler extends MessageHandler
130 {
131 public CancellableHandler(Message message, NotificationContext context)
132 {
133 super(message, context);
134
135 }
136 public boolean _interrupted = false;
137 @Override
138 public HelixTaskResult handleMessage() throws InterruptedException
139 {
140 HelixTaskResult result = new HelixTaskResult();
141 int sleepTimes = 15;
142 if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
143 {
144 sleepTimes = 10;
145 }
146 _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
147 try
148 {
149 for (int i = 0; i < sleepTimes; i++)
150 {
151 Thread.sleep(100);
152 }
153 }
154 catch (InterruptedException e)
155 {
156 _interrupted = true;
157 _timedOutMsgIds.put(_message.getMsgId(), "");
158 result.setInterrupted(true);
159 if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
160 {
161 _message.getRecord().setSimpleField("Cancelcount", "1");
162 }
163 else
164 {
165 int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
166 _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
167 }
168 throw e;
169 }
170 _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
171 result.setSuccess(true);
172 return result;
173 }
174 @Override
175 public void onError(Exception e, ErrorCode code, ErrorType type)
176 {
177
178 _message.getRecord().setSimpleField("exception", e.getMessage());
179 }
180 }
181 @Override
182 public MessageHandler createHandler(Message message,
183 NotificationContext context)
184 {
185
186 _handlersCreated++;
187 return new CancellableHandler(message, context);
188 }
189
190 @Override
191 public String getMessageType()
192 {
193
194 return "Cancellable";
195 }
196
197 @Override
198 public void reset()
199 {
200
201 _handlersCreated = 0;
202 _processedMsgIds.clear();
203 _processingMsgIds.clear();
204 _timedOutMsgIds.clear();
205 }
206 }
207
208 @Test ()
209 public void testNormalMsgExecution() throws InterruptedException
210 {
211 System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
212 HelixTaskExecutor executor = new HelixTaskExecutor();
213 HelixManager manager = new MockClusterManager();
214
215 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
216 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
217
218 TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
219 executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
220
221 NotificationContext changeContext = new NotificationContext(manager);
222 List<Message> msgList = new ArrayList<Message>();
223
224 int nMsgs1 = 5;
225 for(int i = 0; i < nMsgs1; i++)
226 {
227 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
228 msg.setTgtSessionId(manager.getSessionId());
229 msg.setTgtName("Localhost_1123");
230 msg.setSrcName("127.101.1.23_2234");
231 msg.setCorrelationId(UUID.randomUUID().toString());
232 msgList.add(msg);
233 }
234
235
236 int nMsgs2 = 6;
237 for(int i = 0; i < nMsgs2; i++)
238 {
239 Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
240 msg.setTgtSessionId(manager.getSessionId());
241 msg.setTgtName("Localhost_1123");
242 msg.setSrcName("127.101.1.23_2234");
243 msg.setCorrelationId(UUID.randomUUID().toString());
244 msgList.add(msg);
245 }
246 executor.onMessage("someInstance", msgList, changeContext);
247
248 Thread.sleep(1000);
249
250 AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
251 AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
252 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
253 AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
254
255 for(Message record : msgList)
256 {
257 AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
258 AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
259
260 }
261 System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
262 }
263
264 @Test ()
265 public void testUnknownTypeMsgExecution() throws InterruptedException
266 {
267 HelixTaskExecutor executor = new HelixTaskExecutor();
268 HelixManager manager = new MockClusterManager();
269
270 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
271 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
272
273 TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
274
275 NotificationContext changeContext = new NotificationContext(manager);
276 List<Message> msgList = new ArrayList<Message>();
277
278 int nMsgs1 = 5;
279 for(int i = 0; i < nMsgs1; i++)
280 {
281 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
282 msg.setTgtSessionId(manager.getSessionId());
283 msg.setTgtName("Localhost_1123");
284 msg.setSrcName("127.101.1.23_2234");
285 msgList.add(msg);
286 }
287
288
289 int nMsgs2 = 4;
290 for(int i = 0; i < nMsgs2; i++)
291 {
292 Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
293 msg.setTgtSessionId(manager.getSessionId());
294 msg.setTgtName("Localhost_1123");
295 msg.setSrcName("127.101.1.23_2234");
296 msgList.add(msg);
297 }
298 executor.onMessage("someInstance", msgList, changeContext);
299
300 Thread.sleep(1000);
301
302 AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
303 AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
304 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
305 AssertJUnit.assertTrue(factory2._handlersCreated == 0);
306
307 for(Message message : msgList)
308 {
309 if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
310 {
311 AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
312 }
313 }
314 }
315
316
317 @Test ()
318 public void testMsgSessionId() throws InterruptedException
319 {
320 HelixTaskExecutor executor = new HelixTaskExecutor();
321 HelixManager manager = new MockClusterManager();
322
323 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
324 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
325
326 TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
327 executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
328
329 NotificationContext changeContext = new NotificationContext(manager);
330 List<Message> msgList = new ArrayList<Message>();
331
332 int nMsgs1 = 5;
333 for(int i = 0; i < nMsgs1; i++)
334 {
335 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
336 msg.setTgtSessionId("*");
337 msg.setTgtName("");
338 msgList.add(msg);
339 }
340
341
342 int nMsgs2 = 4;
343 for(int i = 0; i < nMsgs2; i++)
344 {
345 Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
346 msg.setTgtSessionId("some other session id");
347 msg.setTgtName("");
348 msgList.add(msg);
349 }
350 executor.onMessage("someInstance", msgList, changeContext);
351
352 Thread.sleep(1000);
353
354 AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
355 AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
356 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
357 AssertJUnit.assertTrue(factory2._handlersCreated == 0);
358
359 for(Message message : msgList)
360 {
361 if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
362 {
363 AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
364 }
365 }
366 }
367
368 @Test()
369 public void testCreateHandlerException() throws InterruptedException
370 {
371 System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
372 HelixTaskExecutor executor = new HelixTaskExecutor();
373 HelixManager manager = new MockClusterManager();
374
375 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
376 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
377
378
379 NotificationContext changeContext = new NotificationContext(manager);
380 List<Message> msgList = new ArrayList<Message>();
381
382 int nMsgs1 = 5;
383 for(int i = 0; i < nMsgs1; i++)
384 {
385 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
386 msg.setTgtSessionId(manager.getSessionId());
387 msg.setTgtName("Localhost_1123");
388 msg.setSrcName("127.101.1.23_2234");
389 msg.setCorrelationId(UUID.randomUUID().toString());
390 msgList.add(msg);
391 }
392 Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
393 exceptionMsg.setTgtSessionId(manager.getSessionId());
394 exceptionMsg.setMsgSubType("EXCEPTION");
395 exceptionMsg.setTgtName("Localhost_1123");
396 exceptionMsg.setSrcName("127.101.1.23_2234");
397 exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
398 msgList.add(exceptionMsg);
399
400 executor.onMessage("someInstance", msgList, changeContext);
401
402 Thread.sleep(1000);
403
404 AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
405 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
406
407 AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
408 System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
409 }
410
411 @Test ()
412 public void testTaskCancellation() throws InterruptedException
413 {
414 HelixTaskExecutor executor = new HelixTaskExecutor();
415 HelixManager manager = new MockClusterManager();
416
417 CancellableHandlerFactory factory = new CancellableHandlerFactory();
418 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
419
420 NotificationContext changeContext = new NotificationContext(manager);
421 List<Message> msgList = new ArrayList<Message>();
422
423 int nMsgs1 = 0;
424 for(int i = 0; i < nMsgs1; i++)
425 {
426 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
427 msg.setTgtSessionId("*");
428 msg.setTgtName("Localhost_1123");
429 msg.setSrcName("127.101.1.23_2234");
430 msgList.add(msg);
431 }
432
433 List<Message> msgListToCancel = new ArrayList<Message>();
434 int nMsgs2 = 4;
435 for(int i = 0; i < nMsgs2; i++)
436 {
437 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
438 msg.setTgtSessionId("*");
439 msgList.add(msg);
440 msg.setTgtName("Localhost_1123");
441 msg.setSrcName("127.101.1.23_2234");
442 msgListToCancel.add(msg);
443 }
444 executor.onMessage("someInstance", msgList, changeContext);
445 Thread.sleep(500);
446 for(int i = 0; i < nMsgs2; i++)
447 {
448
449 HelixTask task = new HelixTask(msgListToCancel.get(i), changeContext, null, null);
450 executor.cancelTask(task);
451 }
452 Thread.sleep(1500);
453
454 AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
455 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
456
457 AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
458
459 for(Message message : msgList)
460 {
461 if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
462 {
463 AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
464 }
465 }
466 }
467
468
469 @Test ()
470 public void testShutdown() throws InterruptedException
471 {
472 System.out.println("START TestCMTaskExecutor.testShutdown()");
473 HelixTaskExecutor executor = new HelixTaskExecutor();
474 HelixManager manager = new MockClusterManager();
475
476 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
477 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
478
479 TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
480 executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
481
482 CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
483 executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
484 int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
485 List<Message> msgList = new ArrayList<Message>();
486
487 for(int i = 0; i < nMsg1; i++)
488 {
489 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
490 msg.setTgtSessionId("*");
491 msg.setTgtName("Localhost_1123");
492 msg.setSrcName("127.101.1.23_2234");
493 msgList.add(msg);
494 }
495
496 for(int i = 0; i < nMsg2; i++)
497 {
498 Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
499 msg.setTgtSessionId("*");
500 msgList.add(msg);
501 msg.setTgtName("Localhost_1123");
502 msg.setSrcName("127.101.1.23_2234");
503 msgList.add(msg);
504 }
505
506 for(int i = 0; i < nMsg3; i++)
507 {
508 Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
509 msg.setTgtSessionId("*");
510 msgList.add(msg);
511 msg.setTgtName("Localhost_1123");
512 msg.setSrcName("127.101.1.23_2234");
513 msgList.add(msg);
514 }
515 NotificationContext changeContext = new NotificationContext(manager);
516 executor.onMessage("some", msgList, changeContext);
517 Thread.sleep(500);
518 for(ExecutorService svc : executor._executorMap.values())
519 {
520 Assert.assertFalse(svc.isShutdown());
521 }
522 Assert.assertTrue(factory._processedMsgIds.size() > 0);
523 executor.shutdown();
524 for(ExecutorService svc : executor._executorMap.values())
525 {
526 Assert.assertTrue(svc.isShutdown());
527 }
528 System.out.println("END TestCMTaskExecutor.testShutdown()");
529 }
530
531 @Test ()
532 public void testNoRetry() throws InterruptedException
533 {
534
535
536 HelixTaskExecutor executor = new HelixTaskExecutor();
537 HelixManager manager = new MockClusterManager();
538
539 CancellableHandlerFactory factory = new CancellableHandlerFactory();
540 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
541
542 NotificationContext changeContext = new NotificationContext(manager);
543
544 List<Message> msgList = new ArrayList<Message>();
545 int nMsgs2 = 4;
546
547 for(int i = 0; i < nMsgs2; i++)
548 {
549 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
550 msg.setTgtSessionId("*");
551 msg.setTgtName("Localhost_1123");
552 msg.setSrcName("127.101.1.23_2234");
553 msg.setExecutionTimeout((i+1) * 600);
554 msgList.add(msg);
555 }
556 executor.onMessage("someInstance", msgList, changeContext);
557
558 Thread.sleep(4000);
559
560 AssertJUnit.assertTrue(factory._handlersCreated == nMsgs2);
561 AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
562
563 for(int i = 0; i<nMsgs2 - 2; i++)
564 {
565 if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
566 {
567 AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
568 AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
569 }
570 }
571 }
572
573 @Test ()
574 public void testRetryOnce() throws InterruptedException
575 {
576
577
578
579
580 HelixTaskExecutor executor = new HelixTaskExecutor();
581 HelixManager manager = new MockClusterManager();
582
583 CancellableHandlerFactory factory = new CancellableHandlerFactory();
584 executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
585
586 NotificationContext changeContext = new NotificationContext(manager);
587
588 List<Message> msgList = new ArrayList<Message>();
589
590
591
592
593 int nMsgs2 = 4;
594 for(int i = 0; i < nMsgs2; i++)
595 {
596 Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
597 msg.setTgtSessionId("*");
598 msg.setTgtName("Localhost_1123");
599 msg.setSrcName("127.101.1.23_2234");
600 msg.setExecutionTimeout((i+1) * 600);
601 msg.setRetryCount(1);
602 msgList.add(msg);
603 }
604 executor.onMessage("someInstance", msgList, changeContext);
605 Thread.sleep(3500);
606 AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
607 AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
608 AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
609 AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
610 AssertJUnit.assertTrue(executor._taskMap.size() == 0);
611
612 }
613 }