1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.TreeMap;
29 import java.util.UUID;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ConcurrentSkipListSet;
32 import java.util.concurrent.CountDownLatch;
33
34 import org.apache.helix.Criteria;
35 import org.apache.helix.HelixDataAccessor;
36 import org.apache.helix.HelixManager;
37 import org.apache.helix.InstanceType;
38 import org.apache.helix.NotificationContext;
39 import org.apache.helix.PropertyKey;
40 import org.apache.helix.PropertyType;
41 import org.apache.helix.ZNRecord;
42 import org.apache.helix.PropertyKey.Builder;
43 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
44 import org.apache.helix.messaging.AsyncCallback;
45 import org.apache.helix.messaging.handling.HelixTaskResult;
46 import org.apache.helix.messaging.handling.MessageHandler;
47 import org.apache.helix.messaging.handling.MessageHandlerFactory;
48 import org.apache.helix.model.ClusterConstraints.ConstraintType;
49 import org.apache.helix.model.ConstraintItem;
50 import org.apache.helix.model.Message;
51 import org.apache.helix.model.Message.MessageState;
52 import org.apache.helix.model.Message.MessageType;
53 import org.apache.helix.model.StatusUpdate;
54 import org.apache.helix.monitoring.ZKPathDataDumpTask;
55 import org.apache.helix.util.HelixUtil;
56 import org.apache.log4j.Level;
57 import org.apache.log4j.Logger;
58 import org.codehaus.jackson.JsonGenerationException;
59 import org.codehaus.jackson.map.JsonMappingException;
60 import org.codehaus.jackson.map.ObjectMapper;
61 import org.codehaus.jackson.map.SerializationConfig;
62 import org.testng.Assert;
63 import org.testng.annotations.Test;
64
65
66 public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
67 {
68
69 class MockAsyncCallback extends AsyncCallback
70 {
71 Message _message;
72 public MockAsyncCallback()
73 {
74 }
75
76 @Override
77 public void onTimeOut()
78 {
79
80
81 }
82
83 @Override
84 public void onReplyMessage(Message message)
85 {
86 _message = message;
87 }
88 }
89 TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
90 public static class TestMessagingHandlerFactory implements
91 MessageHandlerFactory
92 {
93 public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
94 @Override
95 public MessageHandler createHandler(Message message,
96 NotificationContext context)
97 {
98 return new TestMessagingHandler(message, context);
99 }
100
101 @Override
102 public String getMessageType()
103 {
104 return "TestParticipant";
105 }
106
107 @Override
108 public void reset()
109 {
110
111
112 }
113
114 public class TestMessagingHandler extends MessageHandler
115 {
116 public TestMessagingHandler(Message message, NotificationContext context)
117 {
118 super(message, context);
119
120 }
121
122 @Override
123 public HelixTaskResult handleMessage() throws InterruptedException
124 {
125 HelixTaskResult result = new HelixTaskResult();
126 result.setSuccess(true);
127 String destName = _message.getTgtName();
128 result.getTaskResultMap().put("Message", _message.getMsgId());
129 synchronized (_results)
130 {
131 if (!_results.containsKey(_message.getPartitionName()))
132 {
133 _results.put(_message.getPartitionName(),
134 new ConcurrentSkipListSet<String>());
135 }
136 }
137 _results.get(_message.getPartitionName()).add(_message.getMsgId());
138
139 return result;
140 }
141
142 @Override
143 public void onError(Exception e, ErrorCode code, ErrorType type)
144 {
145
146 }
147 }
148 }
149
150 public static class TestMessagingHandlerFactoryLatch implements
151 MessageHandlerFactory
152 {
153 public volatile CountDownLatch _latch = new CountDownLatch(1);
154 public int _messageCount = 0;
155 public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
156 @Override
157 public synchronized MessageHandler createHandler(Message message,
158 NotificationContext context)
159 {
160 _messageCount++;
161 return new TestMessagingHandlerLatch(message, context);
162 }
163
164 public synchronized void signal()
165 {
166 _latch.countDown();
167 _latch = new CountDownLatch(1);
168 }
169
170 @Override
171 public String getMessageType()
172 {
173 return "TestMessagingHandlerLatch";
174 }
175
176 @Override
177 public void reset()
178 {
179
180 }
181
182 public class TestMessagingHandlerLatch extends MessageHandler
183 {
184 public TestMessagingHandlerLatch(Message message, NotificationContext context)
185 {
186 super(message, context);
187
188 }
189
190 @Override
191 public HelixTaskResult handleMessage() throws InterruptedException
192 {
193 _latch.await();
194 HelixTaskResult result = new HelixTaskResult();
195 result.setSuccess(true);
196 result.getTaskResultMap().put("Message", _message.getMsgId());
197 String destName = _message.getTgtName();
198 synchronized (_results)
199 {
200 if (!_results.containsKey(_message.getPartitionName()))
201 {
202 _results.put(_message.getPartitionName(),
203 new ConcurrentSkipListSet<String>());
204 }
205 }
206 _results.get(_message.getPartitionName()).add(destName);
207
208 return result;
209 }
210
211 @Override
212 public void onError(Exception e, ErrorCode code, ErrorType type)
213 {
214
215 }
216 }
217 }
218
219 @Test()
220 public void TestSchedulerMsgUsingQueue() throws Exception
221 {
222 Logger.getRootLogger().setLevel(Level.INFO);
223 _factory._results.clear();
224 HelixManager manager = null;
225 for (int i = 0; i < NODE_NR; i++)
226 {
227 String hostDest = "localhost_" + (START_PORT + i);
228 _startCMResultMap.get(hostDest)._manager.getMessagingService()
229 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
230 manager = _startCMResultMap.get(hostDest)._manager;
231 }
232
233 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
234 .randomUUID().toString());
235 schedulerMessage.setTgtSessionId("*");
236 schedulerMessage.setTgtName("CONTROLLER");
237
238 schedulerMessage.setSrcName("CONTROLLER");
239 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
240
241 Message msg = new Message(_factory.getMessageType(), "Template");
242 msg.setTgtSessionId("*");
243 msg.setMsgState(MessageState.NEW);
244
245
246 Criteria cr = new Criteria();
247 cr.setInstanceName("localhost_%");
248 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
249 cr.setSessionSpecific(false);
250 cr.setResource("%");
251 cr.setPartition("%");
252
253 ObjectMapper mapper = new ObjectMapper();
254 SerializationConfig serializationConfig = mapper.getSerializationConfig();
255 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
256
257 StringWriter sw = new StringWriter();
258 mapper.writeValue(sw, cr);
259
260 String crString = sw.toString();
261
262 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
263 schedulerMessage.getRecord().setMapField("MessageTemplate",
264 msg.getRecord().getSimpleFields());
265 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
266
267 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
268 Builder keyBuilder = helixDataAccessor.keyBuilder();
269 helixDataAccessor.createProperty(
270 keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
271 schedulerMessage);
272
273 for(int i = 0; i < 30; i++)
274 {
275 Thread.sleep(2000);
276 if(_PARTITIONS == _factory._results.size())
277 {
278 break;
279 }
280 }
281
282 Assert.assertEquals(_PARTITIONS, _factory._results.size());
283 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
284 MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
285
286 int messageResultCount = 0;
287 for(int i = 0; i < 10; i++)
288 {
289 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
290 .getRecord();
291 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
292 .get("MessageCount").equals("" + (_PARTITIONS * 3)));
293 for(String key : statusUpdate.getMapFields().keySet())
294 {
295 if(key.startsWith("MessageResult "))
296 {
297 messageResultCount ++;
298 }
299 }
300 if(messageResultCount == _PARTITIONS * 3)
301 {
302 break;
303 }
304 else
305 {
306 Thread.sleep(2000);
307 }
308 }
309 Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
310 int count = 0;
311 for (Set<String> val : _factory._results.values())
312 {
313 count += val.size();
314 }
315 Assert.assertEquals(count, _PARTITIONS * 3);
316
317
318 }
319
320 @Test()
321 public void TestSchedulerMsg() throws Exception
322 {
323 Logger.getRootLogger().setLevel(Level.INFO);
324 _factory._results.clear();
325 HelixManager manager = null;
326 for (int i = 0; i < NODE_NR; i++)
327 {
328 String hostDest = "localhost_" + (START_PORT + i);
329 _startCMResultMap.get(hostDest)._manager.getMessagingService()
330 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
331 manager = _startCMResultMap.get(hostDest)._manager;
332 }
333
334 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
335 .randomUUID().toString());
336 schedulerMessage.setTgtSessionId("*");
337 schedulerMessage.setTgtName("CONTROLLER");
338
339 schedulerMessage.setSrcName("CONTROLLER");
340
341
342 Message msg = new Message(_factory.getMessageType(), "Template");
343 msg.setTgtSessionId("*");
344 msg.setMsgState(MessageState.NEW);
345
346
347 Criteria cr = new Criteria();
348 cr.setInstanceName("localhost_%");
349 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
350 cr.setSessionSpecific(false);
351 cr.setResource("%");
352 cr.setPartition("%");
353
354 ObjectMapper mapper = new ObjectMapper();
355 SerializationConfig serializationConfig = mapper.getSerializationConfig();
356 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
357
358 StringWriter sw = new StringWriter();
359 mapper.writeValue(sw, cr);
360
361 String crString = sw.toString();
362
363 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
364 schedulerMessage.getRecord().setMapField("MessageTemplate",
365 msg.getRecord().getSimpleFields());
366 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
367
368 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
369 Builder keyBuilder = helixDataAccessor.keyBuilder();
370 helixDataAccessor.createProperty(
371 keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
372 schedulerMessage);
373
374 for(int i = 0; i < 30; i++)
375 {
376 Thread.sleep(2000);
377 if(_PARTITIONS == _factory._results.size())
378 {
379 break;
380 }
381 }
382
383 Assert.assertEquals(_PARTITIONS, _factory._results.size());
384 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
385 MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
386
387 int messageResultCount = 0;
388 for(int i = 0; i < 10; i++)
389 {
390 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
391 .getRecord();
392 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
393 .get("MessageCount").equals("" + (_PARTITIONS * 3)));
394 for(String key : statusUpdate.getMapFields().keySet())
395 {
396 if(key.startsWith("MessageResult "))
397 {
398 messageResultCount ++;
399 Assert.assertTrue(statusUpdate.getMapField(key).size() > 1);
400 }
401 }
402 if(messageResultCount == _PARTITIONS * 3)
403 {
404 break;
405 }
406 else
407 {
408 Thread.sleep(2000);
409 }
410 }
411 Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
412 int count = 0;
413 for (Set<String> val : _factory._results.values())
414 {
415 count += val.size();
416 }
417 Assert.assertEquals(count, _PARTITIONS * 3);
418
419
420 String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
421 PropertyType.STATUSUPDATES_CONTROLLER);
422 List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
423 Assert.assertTrue(subPaths.size() > 0);
424 for(String subPath : subPaths)
425 {
426 String nextPath = controllerStatusPath + "/" + subPath;
427 List<String> subsubPaths = _zkClient.getChildren(nextPath);
428 Assert.assertTrue(subsubPaths.size() > 0);
429 }
430
431 String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
432 PropertyType.STATUSUPDATES);
433
434 subPaths = _zkClient.getChildren(instanceStatusPath);
435 Assert.assertTrue(subPaths.size() > 0);
436 for(String subPath : subPaths)
437 {
438 String nextPath = instanceStatusPath + "/" + subPath;
439 List<String> subsubPaths = _zkClient.getChildren(nextPath);
440 Assert.assertTrue(subsubPaths.size() > 0);
441 for(String subsubPath : subsubPaths)
442 {
443 String nextnextPath = nextPath + "/" + subsubPath;
444 Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
445 }
446 }
447 Thread.sleep(3000);
448 ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
449 dumpTask.run();
450
451 subPaths = _zkClient.getChildren(controllerStatusPath);
452 Assert.assertTrue(subPaths.size() > 0);
453 for(String subPath : subPaths)
454 {
455 String nextPath = controllerStatusPath + "/" + subPath;
456 List<String> subsubPaths = _zkClient.getChildren(nextPath);
457 Assert.assertTrue(subsubPaths.size() == 0);
458 }
459
460 subPaths = _zkClient.getChildren(instanceStatusPath);
461 Assert.assertTrue(subPaths.size() > 0);
462 for(String subPath : subPaths)
463 {
464 String nextPath = instanceStatusPath + "/" + subPath;
465 List<String> subsubPaths = _zkClient.getChildren(nextPath);
466 Assert.assertTrue(subsubPaths.size() > 0);
467 for(String subsubPath : subsubPaths)
468 {
469 String nextnextPath = nextPath + "/" + subsubPath;
470 Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
471 }
472 }
473 }
474
475
476 @Test()
477 public void TestSchedulerMsg2() throws Exception
478 {
479 _factory._results.clear();
480 HelixManager manager = null;
481 for (int i = 0; i < NODE_NR; i++)
482 {
483 String hostDest = "localhost_" + (START_PORT + i);
484 _startCMResultMap.get(hostDest)._manager.getMessagingService()
485 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
486 manager = _startCMResultMap.get(hostDest)._manager;
487 }
488
489 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
490 .randomUUID().toString());
491 schedulerMessage.setTgtSessionId("*");
492 schedulerMessage.setTgtName("CONTROLLER");
493
494 schedulerMessage.setSrcName("CONTROLLER");
495
496
497 Message msg = new Message(_factory.getMessageType(), "Template");
498 msg.setTgtSessionId("*");
499 msg.setMsgState(MessageState.NEW);
500
501
502 Criteria cr = new Criteria();
503 cr.setInstanceName("localhost_%");
504 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
505 cr.setSessionSpecific(false);
506 cr.setResource("%");
507 cr.setPartition("%");
508
509 ObjectMapper mapper = new ObjectMapper();
510 SerializationConfig serializationConfig = mapper.getSerializationConfig();
511 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
512
513 StringWriter sw = new StringWriter();
514 mapper.writeValue(sw, cr);
515
516 String crString = sw.toString();
517
518 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
519 schedulerMessage.getRecord().setMapField("MessageTemplate",
520 msg.getRecord().getSimpleFields());
521 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
522 schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
523
524 Criteria cr2 = new Criteria();
525 cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
526 cr2.setInstanceName("*");
527 cr2.setSessionSpecific(false);
528
529 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
530 MockAsyncCallback callback = new MockAsyncCallback();
531 manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
532 String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
533
534 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
535 Builder keyBuilder = helixDataAccessor.keyBuilder();
536 for(int i = 0;i < 10; i++ )
537 {
538 Thread.sleep(200);
539 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
540 MessageType.SCHEDULER_MSG.toString(), msgId);
541 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
542 .getRecord();
543 if(statusUpdate.getMapFields().containsKey("Summary"))
544 {
545 break;
546 }
547 }
548
549 Assert.assertEquals(_PARTITIONS, _factory._results.size());
550 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
551 MessageType.SCHEDULER_MSG.toString(), msgId);
552 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
553 .getRecord();
554 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
555 .get("MessageCount").equals("" + (_PARTITIONS * 3)));
556 int messageResultCount = 0;
557 for(String key : statusUpdate.getMapFields().keySet())
558 {
559 if(key.startsWith("MessageResult "))
560 {
561 messageResultCount ++;
562 }
563 }
564 Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
565
566 int count = 0;
567 for (Set<String> val : _factory._results.values())
568 {
569 count += val.size();
570 }
571 Assert.assertEquals(count, _PARTITIONS * 3);
572 }
573
574 @Test()
575 public void TestSchedulerZeroMsg() throws Exception
576 {
577 TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
578 HelixManager manager = null;
579 for (int i = 0; i < NODE_NR; i++)
580 {
581 String hostDest = "localhost_" + (START_PORT + i);
582 _startCMResultMap.get(hostDest)._manager.getMessagingService()
583 .registerMessageHandlerFactory(factory.getMessageType(), factory);
584 manager = _startCMResultMap.get(hostDest)._manager;
585 }
586
587 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
588 .randomUUID().toString());
589 schedulerMessage.setTgtSessionId("*");
590 schedulerMessage.setTgtName("CONTROLLER");
591
592 schedulerMessage.setSrcName("CONTROLLER");
593
594
595 Message msg = new Message(factory.getMessageType(), "Template");
596 msg.setTgtSessionId("*");
597 msg.setMsgState(MessageState.NEW);
598
599
600 Criteria cr = new Criteria();
601 cr.setInstanceName("localhost_DOESNOTEXIST");
602 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
603 cr.setSessionSpecific(false);
604 cr.setResource("%");
605 cr.setPartition("%");
606
607 ObjectMapper mapper = new ObjectMapper();
608 SerializationConfig serializationConfig = mapper.getSerializationConfig();
609 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
610
611 StringWriter sw = new StringWriter();
612 mapper.writeValue(sw, cr);
613
614 String crString = sw.toString();
615
616 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
617 schedulerMessage.getRecord().setMapField("MessageTemplate",
618 msg.getRecord().getSimpleFields());
619 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
620
621 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
622 Builder keyBuilder = helixDataAccessor.keyBuilder();
623 PropertyKey controllerMessageKey = keyBuilder
624 .controllerMessage(schedulerMessage.getMsgId());
625 helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
626
627 Thread.sleep(3000);
628
629 Assert.assertEquals(0, factory._results.size());
630 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
631 MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
632 for(int i = 0; i< 10; i++)
633 {
634 StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
635 if(update == null || update.getRecord().getMapField("SentMessageCount") == null)
636 {
637 Thread.sleep(1000);
638 }
639 }
640 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
641 .getRecord();
642 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
643 .get("MessageCount").equals("0"));
644 int count = 0;
645 for (Set<String> val : factory._results.values())
646 {
647 count += val.size();
648 }
649 Assert.assertEquals(count, 0);
650 }
651
652
653 @Test()
654 public void TestSchedulerMsg3() throws Exception
655 {
656 _factory._results.clear();
657 HelixManager manager = null;
658 for (int i = 0; i < NODE_NR; i++)
659 {
660 String hostDest = "localhost_" + (START_PORT + i);
661 _startCMResultMap.get(hostDest)._manager.getMessagingService()
662 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
663
664 _startCMResultMap.get(hostDest)._manager.getMessagingService()
665 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
666 manager = _startCMResultMap.get(hostDest)._manager;
667 }
668
669 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
670 .randomUUID().toString());
671 schedulerMessage.setTgtSessionId("*");
672 schedulerMessage.setTgtName("CONTROLLER");
673
674 schedulerMessage.setSrcName("CONTROLLER");
675
676
677 Message msg = new Message(_factory.getMessageType(), "Template");
678 msg.setTgtSessionId("*");
679 msg.setMsgState(MessageState.NEW);
680
681
682 Criteria cr = new Criteria();
683 cr.setInstanceName("localhost_%");
684 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
685 cr.setSessionSpecific(false);
686 cr.setResource("%");
687 cr.setPartition("%");
688
689 ObjectMapper mapper = new ObjectMapper();
690 SerializationConfig serializationConfig = mapper.getSerializationConfig();
691 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
692
693 StringWriter sw = new StringWriter();
694 mapper.writeValue(sw, cr);
695
696 String crString = sw.toString();
697
698 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
699 schedulerMessage.getRecord().setMapField("MessageTemplate",
700 msg.getRecord().getSimpleFields());
701 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
702 schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
703
704 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg3");
705 Criteria cr2 = new Criteria();
706 cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
707 cr2.setInstanceName("*");
708 cr2.setSessionSpecific(false);
709
710 MockAsyncCallback callback = new MockAsyncCallback();
711 cr.setInstanceName("localhost_%");
712 mapper = new ObjectMapper();
713 serializationConfig = mapper.getSerializationConfig();
714 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
715
716 sw = new StringWriter();
717 mapper.writeValue(sw, cr);
718
719 crString = sw.toString();
720 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
721
722 for(int i = 0; i < 4; i++)
723 {
724 callback = new MockAsyncCallback();
725 cr.setInstanceName("localhost_"+(START_PORT + i));
726 mapper = new ObjectMapper();
727 serializationConfig = mapper.getSerializationConfig();
728 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
729
730 sw = new StringWriter();
731 mapper.writeValue(sw, cr);
732 schedulerMessage.setMsgId(UUID.randomUUID().toString());
733 crString = sw.toString();
734 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
735 manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
736 String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
737
738 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
739 Builder keyBuilder = helixDataAccessor.keyBuilder();
740
741 for(int j = 0;j < 100; j++ )
742 {
743 Thread.sleep(200);
744 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
745 MessageType.SCHEDULER_MSG.toString(), msgId);
746 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
747 .getRecord();
748 if(statusUpdate.getMapFields().containsKey("Summary"))
749 {
750 break;
751 }
752 }
753
754 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
755 MessageType.SCHEDULER_MSG.toString(), msgId);
756 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
757 .getRecord();
758 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
759 .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
760 int messageResultCount = 0;
761 for(String key : statusUpdate.getMapFields().keySet())
762 {
763 if(key.startsWith("MessageResult"))
764 {
765 messageResultCount ++;
766 }
767 }
768 Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
769
770 int count = 0;
771
772 for (Set<String> val : _factory._results.values())
773 {
774
775 count += val.size();
776 }
777
778 Assert.assertEquals(count, _PARTITIONS * 3/ 5 *(i+1) );
779 }
780 }
781
782
783 @Test()
784 public void TestSchedulerMsg4() throws Exception
785 {
786 _factory._results.clear();
787 HelixManager manager = null;
788 for (int i = 0; i < NODE_NR; i++)
789 {
790 String hostDest = "localhost_" + (START_PORT + i);
791 _startCMResultMap.get(hostDest)._manager.getMessagingService()
792 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
793
794 _startCMResultMap.get(hostDest)._manager.getMessagingService()
795 .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
796 manager = _startCMResultMap.get(hostDest)._manager;
797 }
798
799 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
800 .randomUUID().toString());
801 schedulerMessage.setTgtSessionId("*");
802 schedulerMessage.setTgtName("CONTROLLER");
803
804 schedulerMessage.setSrcName("CONTROLLER");
805
806
807 Message msg = new Message(_factory.getMessageType(), "Template");
808 msg.setTgtSessionId("*");
809 msg.setMsgState(MessageState.NEW);
810
811
812 Criteria cr = new Criteria();
813 cr.setInstanceName("localhost_%");
814 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
815 cr.setSessionSpecific(false);
816 cr.setResource("TestDB");
817 cr.setPartition("%");
818
819 ObjectMapper mapper = new ObjectMapper();
820 SerializationConfig serializationConfig = mapper.getSerializationConfig();
821 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
822
823 StringWriter sw = new StringWriter();
824 mapper.writeValue(sw, cr);
825
826 String crString = sw.toString();
827
828 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
829 schedulerMessage.getRecord().setMapField("MessageTemplate",
830 msg.getRecord().getSimpleFields());
831 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
832 schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
833
834 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4");
835 Criteria cr2 = new Criteria();
836 cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
837 cr2.setInstanceName("*");
838 cr2.setSessionSpecific(false);
839
840 Map<String, String> constraints = new TreeMap<String, String>();
841 constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
842 constraints.put("TRANSITION", "OFFLINE-COMPLETED");
843 constraints.put("CONSTRAINT_VALUE", "1");
844 constraints.put("INSTANCE", ".*");
845 manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
846 ConstraintType.MESSAGE_CONSTRAINT,
847 "constraint1",
848 new ConstraintItem(constraints));
849
850 MockAsyncCallback callback = new MockAsyncCallback();
851 cr.setInstanceName("localhost_%");
852 mapper = new ObjectMapper();
853 serializationConfig = mapper.getSerializationConfig();
854 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
855
856 sw = new StringWriter();
857 mapper.writeValue(sw, cr);
858
859 crString = sw.toString();
860 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
861 manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
862 String msgIdPrime = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
863
864 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
865 Builder keyBuilder = helixDataAccessor.keyBuilder();
866 ArrayList<String> msgIds = new ArrayList<String>();
867 for(int i = 0; i < NODE_NR; i++)
868 {
869 callback = new MockAsyncCallback();
870 cr.setInstanceName("localhost_"+(START_PORT + i));
871 mapper = new ObjectMapper();
872 serializationConfig = mapper.getSerializationConfig();
873 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
874
875 sw = new StringWriter();
876 mapper.writeValue(sw, cr);
877 schedulerMessage.setMsgId(UUID.randomUUID().toString());
878 crString = sw.toString();
879 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
880 manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
881 String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
882 msgIds.add(msgId);
883 }
884 for(int i = 0; i < NODE_NR; i++)
885 {
886 String msgId = msgIds.get(i);
887 for(int j = 0;j < 100; j++ )
888 {
889 Thread.sleep(200);
890 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
891 MessageType.SCHEDULER_MSG.toString(), msgId);
892 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
893 .getRecord();
894 if(statusUpdate.getMapFields().containsKey("Summary"))
895 {
896
897 break;
898 }
899 }
900
901 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
902 MessageType.SCHEDULER_MSG.toString(), msgId);
903 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
904 .getRecord();
905 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
906 .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
907 int messageResultCount = 0;
908 for(String key : statusUpdate.getMapFields().keySet())
909 {
910 if(key.startsWith("MessageResult"))
911 {
912 messageResultCount ++;
913 }
914 }
915 if(messageResultCount != _PARTITIONS * 3 / 5)
916 {
917 int x = 10;
918 x = x + messageResultCount;
919 }
920 Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
921 }
922
923 for(int j = 0;j < 100; j++ )
924 {
925 Thread.sleep(200);
926 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
927 MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
928 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
929 .getRecord();
930 if(statusUpdate.getMapFields().containsKey("Summary"))
931 {
932 break;
933 }
934 }
935 int count = 0;
936 for (Set<String> val : _factory._results.values())
937 {
938
939 count += val.size();
940 }
941
942 Assert.assertEquals(count, _PARTITIONS * 3 * 2 );
943 }
944
945
946 @Test
947 public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, IOException, InterruptedException
948 {
949 TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
950 HelixManager manager = null;
951 for (int i = 0; i < NODE_NR; i++)
952 {
953 String hostDest = "localhost_" + (START_PORT + i);
954 _startCMResultMap.get(hostDest)._manager.getMessagingService()
955 .registerMessageHandlerFactory(factory.getMessageType(), factory);
956
957 _startCMResultMap.get(hostDest)._manager.getMessagingService()
958 .registerMessageHandlerFactory(factory.getMessageType(), factory);
959 manager = _startCMResultMap.get(hostDest)._manager;
960 }
961
962 Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
963 .randomUUID().toString());
964 schedulerMessage.setTgtSessionId("*");
965 schedulerMessage.setTgtName("CONTROLLER");
966
967 schedulerMessage.setSrcName("CONTROLLER");
968
969
970 Message msg = new Message(factory.getMessageType(), "Template");
971 msg.setTgtSessionId("*");
972 msg.setMsgState(MessageState.NEW);
973
974
975 Criteria cr = new Criteria();
976 cr.setInstanceName("localhost_%");
977 cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
978 cr.setSessionSpecific(false);
979 cr.setResource("%");
980 cr.setPartition("%");
981
982 ObjectMapper mapper = new ObjectMapper();
983 SerializationConfig serializationConfig = mapper.getSerializationConfig();
984 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
985
986 StringWriter sw = new StringWriter();
987 mapper.writeValue(sw, cr);
988
989 String crString = sw.toString();
990
991 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
992 schedulerMessage.getRecord().setMapField("MessageTemplate",
993 msg.getRecord().getSimpleFields());
994 schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
995 schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
996 schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
997
998 Criteria cr2 = new Criteria();
999 cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
1000 cr2.setInstanceName("*");
1001 cr2.setSessionSpecific(false);
1002
1003
1004 MockAsyncCallback callback = new MockAsyncCallback();
1005 mapper = new ObjectMapper();
1006 serializationConfig = mapper.getSerializationConfig();
1007 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
1008
1009 sw = new StringWriter();
1010 mapper.writeValue(sw, cr);
1011
1012
1013 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
1014 Builder keyBuilder = helixDataAccessor.keyBuilder();
1015
1016
1017 Map<String, String> constraints = new TreeMap<String, String>();
1018 constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
1019 constraints.put("TRANSITION", "OFFLINE-COMPLETED");
1020 constraints.put("CONSTRAINT_VALUE", "1");
1021 constraints.put("INSTANCE", ".*");
1022 manager.getClusterManagmentTool().setConstraint(manager.getClusterName(),
1023 ConstraintType.MESSAGE_CONSTRAINT,
1024 "constraint1",
1025 new ConstraintItem(constraints));
1026
1027
1028 crString = sw.toString();
1029 schedulerMessage.getRecord().setSimpleField("Criteria", crString);
1030 manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
1031 String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
1032
1033 for(int j = 0;j < 10; j++ )
1034 {
1035 Thread.sleep(200);
1036 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1037 MessageType.SCHEDULER_MSG.toString(), msgId);
1038 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1039 .getRecord();
1040 if(statusUpdate.getMapFields().containsKey("SentMessageCount"))
1041 {
1042 Assert.assertEquals(statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""+(_PARTITIONS * 3));
1043 break;
1044 }
1045 }
1046
1047 for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
1048 {
1049 for(int j = 0; j< 10; j++)
1050 {
1051 Thread.sleep(300);
1052 if(factory._messageCount == 5*(i+1)) break;
1053 }
1054 Thread.sleep(300);
1055 Assert.assertEquals(factory._messageCount, 5*(i+1));
1056 factory.signal();
1057
1058 }
1059
1060 for(int j = 0;j < 10; j++ )
1061 {
1062 Thread.sleep(200);
1063 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1064 MessageType.SCHEDULER_MSG.toString(), msgId);
1065 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1066 .getRecord();
1067 if(statusUpdate.getMapFields().containsKey("Summary"))
1068 {
1069 break;
1070 }
1071 }
1072
1073 Assert.assertEquals(_PARTITIONS, factory._results.size());
1074 PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1075 MessageType.SCHEDULER_MSG.toString(), msgId);
1076 ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1077 .getRecord();
1078 Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
1079 .get("MessageCount").equals("" + (_PARTITIONS * 3)));
1080 int messageResultCount = 0;
1081 for(String key : statusUpdate.getMapFields().keySet())
1082 {
1083 if(key.startsWith("MessageResult "))
1084 {
1085 messageResultCount ++;
1086 }
1087 }
1088 Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
1089
1090 int count = 0;
1091 for (Set<String> val : factory._results.values())
1092 {
1093 count += val.size();
1094 }
1095 Assert.assertEquals(count, _PARTITIONS * 3);
1096
1097 manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(),
1098 ConstraintType.MESSAGE_CONSTRAINT,
1099 "constraint1");
1100
1101 }
1102 }