View Javadoc

1   package org.apache.helix.messaging.handling;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.UUID;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ExecutorService;
27  
28  import org.apache.helix.HelixException;
29  import org.apache.helix.HelixManager;
30  import org.apache.helix.Mocks;
31  import org.apache.helix.NotificationContext;
32  import org.apache.helix.messaging.handling.HelixTaskExecutor;
33  import org.apache.helix.messaging.handling.HelixTaskResult;
34  import org.apache.helix.messaging.handling.MessageHandler;
35  import org.apache.helix.messaging.handling.MessageHandlerFactory;
36  import org.apache.helix.model.Message;
37  import org.apache.helix.model.Message.MessageState;
38  import org.testng.Assert;
39  import org.testng.AssertJUnit;
40  import org.testng.annotations.Test;
41  
42  
43  public class TestHelixTaskExecutor
44  {
45    public static class MockClusterManager extends Mocks.MockManager
46    {
47      @Override
48      public String getSessionId()
49      {
50        return "123";
51      }
52    }
53  
54    class TestMessageHandlerFactory implements MessageHandlerFactory
55    {
56      int _handlersCreated = 0;
57      ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
58      class TestMessageHandler extends MessageHandler
59      {
60        public TestMessageHandler(Message message, NotificationContext context)
61        {
62          super(message, context);
63          // TODO Auto-generated constructor stub
64        }
65  
66        @Override
67        public HelixTaskResult handleMessage() throws InterruptedException
68        {
69          HelixTaskResult result = new HelixTaskResult();
70          _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
71          Thread.currentThread().sleep(100);
72          result.setSuccess(true);
73          return result;
74        }
75  
76        @Override
77        public void onError(Exception e, ErrorCode code, ErrorType type)
78        {
79          // TODO Auto-generated method stub
80          
81        }
82      }
83      @Override
84      public MessageHandler createHandler(Message message,
85          NotificationContext context)
86      {
87        // TODO Auto-generated method stub
88        if(message.getMsgSubType()!= null && message.getMsgSubType().equals("EXCEPTION"))
89        {
90          throw new HelixException("Test Message handler exception, can ignore");
91        }
92        _handlersCreated++;
93        return new TestMessageHandler(message, context);
94      }
95  
96      @Override
97      public String getMessageType()
98      {
99        // TODO Auto-generated method stub
100       return "TestingMessageHandler";
101     }
102 
103     @Override
104     public void reset()
105     {
106       // TODO Auto-generated method stub
107 
108     }
109   }
110 
111   class TestMessageHandlerFactory2 extends TestMessageHandlerFactory
112   {
113     @Override
114     public String getMessageType()
115     {
116       // TODO Auto-generated method stub
117       return "TestingMessageHandler2";
118     }
119     
120   }
121 
122   class CancellableHandlerFactory implements MessageHandlerFactory
123   {
124 
125     int _handlersCreated = 0;
126     ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>();
127     ConcurrentHashMap<String, String> _processingMsgIds = new ConcurrentHashMap<String, String>();
128     ConcurrentHashMap<String, String> _timedOutMsgIds = new ConcurrentHashMap<String, String>();
129     class CancellableHandler extends MessageHandler
130     {
131       public CancellableHandler(Message message, NotificationContext context)
132       {
133         super(message, context);
134         // TODO Auto-generated constructor stub
135       }
136       public boolean _interrupted = false;
137       @Override
138       public HelixTaskResult handleMessage() throws InterruptedException
139       {
140         HelixTaskResult result = new HelixTaskResult();
141         int sleepTimes = 15;
142         if(_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
143         {
144           sleepTimes = 10;
145         }
146         _processingMsgIds.put(_message.getMsgId(), _message.getMsgId());
147         try
148         {
149           for (int i = 0; i < sleepTimes; i++)
150           {
151             Thread.sleep(100);
152           }
153         } 
154         catch (InterruptedException e)
155         {
156           _interrupted = true;
157           _timedOutMsgIds.put(_message.getMsgId(), "");
158           result.setInterrupted(true);
159           if(!_message.getRecord().getSimpleFields().containsKey("Cancelcount"))
160           {
161             _message.getRecord().setSimpleField("Cancelcount", "1");
162           }
163           else
164           {
165             int c = Integer.parseInt( _message.getRecord().getSimpleField("Cancelcount"));
166             _message.getRecord().setSimpleField("Cancelcount", ""+(c + 1));
167           }
168           throw e;
169         }
170         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
171         result.setSuccess(true);
172         return result;
173       }
174       @Override
175       public void onError(Exception e, ErrorCode code, ErrorType type)
176       {
177         // TODO Auto-generated method stub
178         _message.getRecord().setSimpleField("exception", e.getMessage());
179       }
180     }
181     @Override
182     public MessageHandler createHandler(Message message,
183         NotificationContext context)
184     {
185       // TODO Auto-generated method stub
186       _handlersCreated++;
187       return new CancellableHandler(message, context);
188     }
189 
190     @Override
191     public String getMessageType()
192     {
193       // TODO Auto-generated method stub
194       return "Cancellable";
195     }
196 
197     @Override
198     public void reset()
199     {
200       // TODO Auto-generated method stub
201       _handlersCreated = 0;
202       _processedMsgIds.clear();
203        _processingMsgIds.clear();
204       _timedOutMsgIds.clear();
205     }
206   }
207 
208   @Test ()
209   public void testNormalMsgExecution() throws InterruptedException
210   {
211     System.out.println("START TestCMTaskExecutor.testNormalMsgExecution()");
212     HelixTaskExecutor executor = new HelixTaskExecutor();
213     HelixManager manager = new MockClusterManager();
214 
215     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
216     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
217 
218     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
219     executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
220 
221     NotificationContext changeContext = new NotificationContext(manager);
222     List<Message> msgList = new ArrayList<Message>();
223 
224     int nMsgs1 = 5;
225     for(int i = 0; i < nMsgs1; i++)
226     {
227       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
228       msg.setTgtSessionId(manager.getSessionId());
229       msg.setTgtName("Localhost_1123");
230       msg.setSrcName("127.101.1.23_2234");
231       msg.setCorrelationId(UUID.randomUUID().toString());
232       msgList.add(msg);
233     }
234 
235 
236     int nMsgs2 = 6;
237     for(int i = 0; i < nMsgs2; i++)
238     {
239       Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
240       msg.setTgtSessionId(manager.getSessionId());
241       msg.setTgtName("Localhost_1123");
242       msg.setSrcName("127.101.1.23_2234");
243       msg.setCorrelationId(UUID.randomUUID().toString());
244       msgList.add(msg);
245     }
246     executor.onMessage("someInstance", msgList, changeContext);
247 
248     Thread.sleep(1000);
249 
250     AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
251     AssertJUnit.assertTrue(factory2._processedMsgIds.size() == nMsgs2);
252     AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
253     AssertJUnit.assertTrue(factory2._handlersCreated == nMsgs2);
254 
255     for(Message record : msgList)
256     {
257       AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(record.getId()) || factory2._processedMsgIds.containsKey(record.getId()));
258       AssertJUnit.assertFalse(factory._processedMsgIds.containsKey(record.getId()) && factory2._processedMsgIds.containsKey(record.getId()));
259 
260     }
261     System.out.println("END TestCMTaskExecutor.testNormalMsgExecution()");
262   }
263 
264   @Test ()
265   public void testUnknownTypeMsgExecution() throws InterruptedException
266   {
267     HelixTaskExecutor executor = new HelixTaskExecutor();
268     HelixManager manager = new MockClusterManager();
269 
270     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
271     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
272 
273     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
274 
275     NotificationContext changeContext = new NotificationContext(manager);
276     List<Message> msgList = new ArrayList<Message>();
277 
278     int nMsgs1 = 5;
279     for(int i = 0; i < nMsgs1; i++)
280     {
281       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
282       msg.setTgtSessionId(manager.getSessionId());
283       msg.setTgtName("Localhost_1123");
284       msg.setSrcName("127.101.1.23_2234");
285       msgList.add(msg);
286     }
287 
288 
289     int nMsgs2 = 4;
290     for(int i = 0; i < nMsgs2; i++)
291     {
292       Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
293       msg.setTgtSessionId(manager.getSessionId());
294       msg.setTgtName("Localhost_1123");
295       msg.setSrcName("127.101.1.23_2234");
296       msgList.add(msg);
297     }
298     executor.onMessage("someInstance", msgList, changeContext);
299 
300     Thread.sleep(1000);
301 
302     AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
303     AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
304     AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
305     AssertJUnit.assertTrue(factory2._handlersCreated == 0);
306 
307     for(Message message : msgList)
308     {
309       if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
310       {
311         AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
312       }
313     }
314   }
315 
316 
317   @Test ()
318   public void testMsgSessionId() throws InterruptedException
319   {
320     HelixTaskExecutor executor = new HelixTaskExecutor();
321     HelixManager manager = new MockClusterManager();
322 
323     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
324     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
325 
326     TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
327     executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
328 
329     NotificationContext changeContext = new NotificationContext(manager);
330     List<Message> msgList = new ArrayList<Message>();
331 
332     int nMsgs1 = 5;
333     for(int i = 0; i < nMsgs1; i++)
334     {
335       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
336       msg.setTgtSessionId("*");
337       msg.setTgtName("");
338       msgList.add(msg);
339     }
340 
341 
342     int nMsgs2 = 4;
343     for(int i = 0; i < nMsgs2; i++)
344     {
345       Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
346       msg.setTgtSessionId("some other session id");
347       msg.setTgtName("");
348       msgList.add(msg);
349     }
350     executor.onMessage("someInstance", msgList, changeContext);
351 
352     Thread.sleep(1000);
353 
354     AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
355     AssertJUnit.assertTrue(factory2._processedMsgIds.size() == 0);
356     AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
357     AssertJUnit.assertTrue(factory2._handlersCreated == 0);
358 
359     for(Message message : msgList)
360     {
361       if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
362       {
363         AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
364       }
365     }
366   }
367 
368   @Test()
369   public void testCreateHandlerException() throws InterruptedException
370   {
371     System.out.println("START TestCMTaskExecutor.testCreateHandlerException()");
372     HelixTaskExecutor executor = new HelixTaskExecutor();
373     HelixManager manager = new MockClusterManager();
374 
375     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
376     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
377     
378 
379     NotificationContext changeContext = new NotificationContext(manager);
380     List<Message> msgList = new ArrayList<Message>();
381 
382     int nMsgs1 = 5;
383     for(int i = 0; i < nMsgs1; i++)
384     {
385       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
386       msg.setTgtSessionId(manager.getSessionId());
387       msg.setTgtName("Localhost_1123");
388       msg.setSrcName("127.101.1.23_2234");
389       msg.setCorrelationId(UUID.randomUUID().toString());
390       msgList.add(msg);
391     }
392     Message exceptionMsg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
393     exceptionMsg.setTgtSessionId(manager.getSessionId());
394     exceptionMsg.setMsgSubType("EXCEPTION");
395     exceptionMsg.setTgtName("Localhost_1123");
396     exceptionMsg.setSrcName("127.101.1.23_2234");
397     exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
398     msgList.add(exceptionMsg);
399     
400     executor.onMessage("someInstance", msgList, changeContext);
401 
402     Thread.sleep(1000);
403 
404     AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
405     AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
406 
407     AssertJUnit.assertTrue(exceptionMsg.getMsgState() == MessageState.UNPROCESSABLE);
408     System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
409   }
410 
411   @Test ()
412   public void testTaskCancellation() throws InterruptedException
413   {
414     HelixTaskExecutor executor = new HelixTaskExecutor();
415     HelixManager manager = new MockClusterManager();
416 
417     CancellableHandlerFactory factory = new CancellableHandlerFactory();
418     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
419 
420     NotificationContext changeContext = new NotificationContext(manager);
421     List<Message> msgList = new ArrayList<Message>();
422 
423     int nMsgs1 = 0;
424     for(int i = 0; i < nMsgs1; i++)
425     {
426       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
427       msg.setTgtSessionId("*");
428       msg.setTgtName("Localhost_1123");
429       msg.setSrcName("127.101.1.23_2234");
430       msgList.add(msg);
431     }
432 
433     List<Message> msgListToCancel = new ArrayList<Message>();
434     int nMsgs2 = 4;
435     for(int i = 0; i < nMsgs2; i++)
436     {
437       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
438       msg.setTgtSessionId("*");
439       msgList.add(msg);
440       msg.setTgtName("Localhost_1123");
441       msg.setSrcName("127.101.1.23_2234");
442       msgListToCancel.add(msg);
443     }
444     executor.onMessage("someInstance", msgList, changeContext);
445     Thread.sleep(500);
446     for(int i = 0; i < nMsgs2; i++)
447     {
448       // executor.cancelTask(msgListToCancel.get(i), changeContext);
449       HelixTask task = new HelixTask(msgListToCancel.get(i), changeContext, null, null);
450       executor.cancelTask(task);
451     }
452     Thread.sleep(1500);
453 
454     AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
455     AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1 + nMsgs2);
456 
457     AssertJUnit.assertTrue(factory._processingMsgIds.size() == nMsgs1 + nMsgs2);
458 
459     for(Message message : msgList)
460     {
461       if(message.getMsgType().equalsIgnoreCase(factory.getMessageType()))
462       {
463         AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
464       }
465     }
466   }
467 
468 
469   @Test ()
470   public void testShutdown() throws InterruptedException
471   {
472      System.out.println("START TestCMTaskExecutor.testShutdown()");
473      HelixTaskExecutor executor = new HelixTaskExecutor();
474       HelixManager manager = new MockClusterManager();
475 
476       TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
477       executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
478 
479       TestMessageHandlerFactory2 factory2 = new TestMessageHandlerFactory2();
480       executor.registerMessageHandlerFactory(factory2.getMessageType(), factory2);
481 
482       CancellableHandlerFactory factory3 = new CancellableHandlerFactory();
483       executor.registerMessageHandlerFactory(factory3.getMessageType(), factory3);
484       int nMsg1 = 10, nMsg2 = 10, nMsg3 = 10;
485       List<Message> msgList = new ArrayList<Message>();
486 
487       for(int i = 0; i < nMsg1; i++)
488       {
489         Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
490         msg.setTgtSessionId("*");
491         msg.setTgtName("Localhost_1123");
492         msg.setSrcName("127.101.1.23_2234");
493         msgList.add(msg);
494       }
495 
496       for(int i = 0; i < nMsg2; i++)
497       {
498         Message msg = new Message(factory2.getMessageType(), UUID.randomUUID().toString());
499         msg.setTgtSessionId("*");
500         msgList.add(msg);
501         msg.setTgtName("Localhost_1123");
502         msg.setSrcName("127.101.1.23_2234");
503         msgList.add(msg);
504       }
505 
506       for(int i = 0; i < nMsg3; i++)
507       {
508         Message msg = new Message(factory3.getMessageType(), UUID.randomUUID().toString());
509         msg.setTgtSessionId("*");
510         msgList.add(msg);
511         msg.setTgtName("Localhost_1123");
512         msg.setSrcName("127.101.1.23_2234");
513         msgList.add(msg);
514       }
515       NotificationContext changeContext = new NotificationContext(manager);
516       executor.onMessage("some", msgList, changeContext);
517       Thread.sleep(500);
518       for(ExecutorService svc : executor._executorMap.values())
519       {
520         Assert.assertFalse(svc.isShutdown());
521       }
522       Assert.assertTrue(factory._processedMsgIds.size() > 0);
523       executor.shutdown();
524       for(ExecutorService svc : executor._executorMap.values())
525       {
526         Assert.assertTrue(svc.isShutdown());
527       }
528       System.out.println("END TestCMTaskExecutor.testShutdown()");
529   }
530   
531   @Test ()
532   public void testNoRetry() throws InterruptedException
533   {
534 //    String p = "test_";
535 //    System.out.println(p.substring(p.lastIndexOf('_')+1));
536     HelixTaskExecutor executor = new HelixTaskExecutor();
537     HelixManager manager = new MockClusterManager();
538 
539     CancellableHandlerFactory factory = new CancellableHandlerFactory();
540     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
541 
542     NotificationContext changeContext = new NotificationContext(manager);
543 
544     List<Message> msgList = new ArrayList<Message>();
545     int nMsgs2 = 4;
546     // Test the case in which retry = 0
547     for(int i = 0; i < nMsgs2; i++)
548     {
549       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
550       msg.setTgtSessionId("*");
551       msg.setTgtName("Localhost_1123");
552       msg.setSrcName("127.101.1.23_2234");
553       msg.setExecutionTimeout((i+1) * 600);
554       msgList.add(msg);
555     }
556     executor.onMessage("someInstance", msgList, changeContext);
557     
558     Thread.sleep(4000);
559 
560     AssertJUnit.assertTrue(factory._handlersCreated ==  nMsgs2);
561     AssertJUnit.assertEquals(factory._timedOutMsgIds.size() , 2);
562     //AssertJUnit.assertFalse(msgList.get(0).getRecord().getSimpleFields().containsKey("TimeOut"));
563     for(int i = 0; i<nMsgs2 - 2; i++)
564     {
565       if(msgList.get(i).getMsgType().equalsIgnoreCase(factory.getMessageType()))
566       {
567         AssertJUnit.assertTrue(msgList.get(i).getRecord().getSimpleFields().containsKey("Cancelcount"));
568         AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
569       }
570     }
571   }
572   
573   @Test ()
574   public void testRetryOnce() throws InterruptedException
575   {
576 //	  Logger.getRootLogger().setLevel(Level.INFO);
577 
578 //    String p = "test_";
579 //    System.out.println(p.substring(p.lastIndexOf('_')+1));
580     HelixTaskExecutor executor = new HelixTaskExecutor();
581     HelixManager manager = new MockClusterManager();
582 
583     CancellableHandlerFactory factory = new CancellableHandlerFactory();
584     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
585 
586     NotificationContext changeContext = new NotificationContext(manager);
587 
588     List<Message> msgList = new ArrayList<Message>();
589 
590 //    factory.reset();
591 //    msgList.clear();
592     // Test the case that the message are executed for the second time
593     int nMsgs2 = 4;
594     for(int i = 0; i < nMsgs2; i++)
595     {
596       Message msg = new Message(factory.getMessageType(), UUID.randomUUID().toString());
597       msg.setTgtSessionId("*");
598       msg.setTgtName("Localhost_1123");
599       msg.setSrcName("127.101.1.23_2234");
600       msg.setExecutionTimeout((i+1) * 600);
601       msg.setRetryCount(1);
602       msgList.add(msg);
603     }
604     executor.onMessage("someInstance", msgList, changeContext);
605     Thread.sleep(3500);
606     AssertJUnit.assertEquals(factory._processedMsgIds.size(),3);
607     AssertJUnit.assertTrue(msgList.get(0).getRecord().getSimpleField("Cancelcount").equals("2"));
608     AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
609     AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
610     AssertJUnit.assertTrue(executor._taskMap.size() == 0);
611     
612   }
613 }