View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.Set;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentSkipListSet;
32  import java.util.concurrent.CountDownLatch;
33  
34  import org.apache.helix.Criteria;
35  import org.apache.helix.HelixDataAccessor;
36  import org.apache.helix.HelixManager;
37  import org.apache.helix.InstanceType;
38  import org.apache.helix.NotificationContext;
39  import org.apache.helix.PropertyKey;
40  import org.apache.helix.PropertyType;
41  import org.apache.helix.ZNRecord;
42  import org.apache.helix.PropertyKey.Builder;
43  import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
44  import org.apache.helix.messaging.AsyncCallback;
45  import org.apache.helix.messaging.handling.HelixTaskResult;
46  import org.apache.helix.messaging.handling.MessageHandler;
47  import org.apache.helix.messaging.handling.MessageHandlerFactory;
48  import org.apache.helix.model.ClusterConstraints.ConstraintType;
49  import org.apache.helix.model.ConstraintItem;
50  import org.apache.helix.model.Message;
51  import org.apache.helix.model.Message.MessageState;
52  import org.apache.helix.model.Message.MessageType;
53  import org.apache.helix.model.StatusUpdate;
54  import org.apache.helix.monitoring.ZKPathDataDumpTask;
55  import org.apache.helix.util.HelixUtil;
56  import org.apache.log4j.Level;
57  import org.apache.log4j.Logger;
58  import org.codehaus.jackson.JsonGenerationException;
59  import org.codehaus.jackson.map.JsonMappingException;
60  import org.codehaus.jackson.map.ObjectMapper;
61  import org.codehaus.jackson.map.SerializationConfig;
62  import org.testng.Assert;
63  import org.testng.annotations.Test;
64  
65  
66  public class TestSchedulerMessage extends ZkStandAloneCMTestBaseWithPropertyServerCheck
67  {
68  
69    class MockAsyncCallback extends AsyncCallback
70    {
71      Message _message;
72      public MockAsyncCallback()
73      {
74      }
75  
76      @Override
77      public void onTimeOut()
78      {
79        // TODO Auto-generated method stub
80  
81      }
82  
83      @Override
84      public void onReplyMessage(Message message)
85      {
86        _message = message;
87      }
88    }
89    TestMessagingHandlerFactory _factory = new TestMessagingHandlerFactory();
90    public static class TestMessagingHandlerFactory implements
91        MessageHandlerFactory
92    {
93      public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
94      @Override
95      public MessageHandler createHandler(Message message,
96          NotificationContext context)
97      {
98        return new TestMessagingHandler(message, context);
99      }
100 
101     @Override
102     public String getMessageType()
103     {
104       return "TestParticipant";
105     }
106 
107     @Override
108     public void reset()
109     {
110       // TODO Auto-generated method stub
111 
112     }
113 
114     public class TestMessagingHandler extends MessageHandler
115     {
116       public TestMessagingHandler(Message message, NotificationContext context)
117       {
118         super(message, context);
119         // TODO Auto-generated constructor stub
120       }
121 
122       @Override
123       public HelixTaskResult handleMessage() throws InterruptedException
124       {
125         HelixTaskResult result = new HelixTaskResult();
126         result.setSuccess(true);
127         String destName = _message.getTgtName();
128         result.getTaskResultMap().put("Message", _message.getMsgId());
129         synchronized (_results)
130         {
131           if (!_results.containsKey(_message.getPartitionName()))
132           {
133             _results.put(_message.getPartitionName(),
134                 new ConcurrentSkipListSet<String>());
135           }
136         }
137         _results.get(_message.getPartitionName()).add(_message.getMsgId());
138         //System.err.println("Message " + _message.getMsgId() + " executed");
139         return result;
140       }
141 
142       @Override
143       public void onError(Exception e, ErrorCode code, ErrorType type)
144       {
145         // TODO Auto-generated method stub
146       }
147     }
148   }
149   
150   public static class TestMessagingHandlerFactoryLatch implements
151     MessageHandlerFactory
152   {
153     public volatile CountDownLatch _latch = new CountDownLatch(1);
154     public int _messageCount = 0;
155     public Map<String, Set<String>> _results = new ConcurrentHashMap<String, Set<String>>();
156     @Override
157     public synchronized MessageHandler createHandler(Message message,
158       NotificationContext context)
159     {
160       _messageCount++;
161       return new TestMessagingHandlerLatch(message, context);
162     }
163     
164     public synchronized void signal()
165     {
166       _latch.countDown();
167       _latch = new CountDownLatch(1);
168     }
169   
170     @Override
171     public String getMessageType()
172     {
173       return "TestMessagingHandlerLatch";
174     }
175   
176     @Override
177     public void reset()
178     {
179       // TODO Auto-generated method stub
180     }
181 
182   public class TestMessagingHandlerLatch extends MessageHandler
183   {
184     public TestMessagingHandlerLatch(Message message, NotificationContext context)
185     {
186       super(message, context);
187       // TODO Auto-generated constructor stub
188     }
189   
190     @Override
191     public HelixTaskResult handleMessage() throws InterruptedException
192     {
193       _latch.await();
194       HelixTaskResult result = new HelixTaskResult();
195       result.setSuccess(true);
196       result.getTaskResultMap().put("Message", _message.getMsgId());
197       String destName = _message.getTgtName();
198       synchronized (_results)
199       {
200         if (!_results.containsKey(_message.getPartitionName()))
201         {
202           _results.put(_message.getPartitionName(),
203               new ConcurrentSkipListSet<String>());
204         }
205       }
206       _results.get(_message.getPartitionName()).add(destName);
207       //System.err.println("Message " + _message.getMsgId() + " executed");
208       return result;
209     }
210   
211     @Override
212     public void onError(Exception e, ErrorCode code, ErrorType type)
213     {
214       // TODO Auto-generated method stub
215     }
216   }
217 }
218   
219   @Test()
220   public void TestSchedulerMsgUsingQueue() throws Exception
221   {
222     Logger.getRootLogger().setLevel(Level.INFO);
223     _factory._results.clear();
224     HelixManager manager = null;
225     for (int i = 0; i < NODE_NR; i++)
226     {
227       String hostDest = "localhost_" + (START_PORT + i);
228       _startCMResultMap.get(hostDest)._manager.getMessagingService()
229           .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
230       manager = _startCMResultMap.get(hostDest)._manager;
231     }
232 
233     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
234         .randomUUID().toString());
235     schedulerMessage.setTgtSessionId("*");
236     schedulerMessage.setTgtName("CONTROLLER");
237     // TODO: change it to "ADMIN" ?
238     schedulerMessage.setSrcName("CONTROLLER");
239     schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
240     // Template for the individual message sent to each participant
241     Message msg = new Message(_factory.getMessageType(), "Template");
242     msg.setTgtSessionId("*");
243     msg.setMsgState(MessageState.NEW);
244 
245     // Criteria to send individual messages
246     Criteria cr = new Criteria();
247     cr.setInstanceName("localhost_%");
248     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
249     cr.setSessionSpecific(false);
250     cr.setResource("%");
251     cr.setPartition("%");
252 
253     ObjectMapper mapper = new ObjectMapper();
254     SerializationConfig serializationConfig = mapper.getSerializationConfig();
255     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
256 
257     StringWriter sw = new StringWriter();
258     mapper.writeValue(sw, cr);
259 
260     String crString = sw.toString();
261 
262     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
263     schedulerMessage.getRecord().setMapField("MessageTemplate",
264         msg.getRecord().getSimpleFields());
265     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
266 
267     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
268     Builder keyBuilder = helixDataAccessor.keyBuilder();
269     helixDataAccessor.createProperty(
270         keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
271         schedulerMessage);
272     
273     for(int i = 0; i < 30; i++)
274     {
275       Thread.sleep(2000);
276       if(_PARTITIONS == _factory._results.size())
277       {
278         break;
279       }
280     }
281 
282     Assert.assertEquals(_PARTITIONS, _factory._results.size());
283     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
284         MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
285       
286     int messageResultCount = 0;
287     for(int i = 0; i < 10; i++)
288     {
289       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
290           .getRecord();
291       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
292           .get("MessageCount").equals("" + (_PARTITIONS * 3)));
293       for(String key : statusUpdate.getMapFields().keySet())
294       {
295         if(key.startsWith("MessageResult "))
296         {
297           messageResultCount ++;
298         }
299       }
300       if(messageResultCount == _PARTITIONS * 3)
301       {
302         break;
303       }
304       else
305       {
306         Thread.sleep(2000);
307       }
308     }
309     Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
310     int count = 0;
311     for (Set<String> val : _factory._results.values())
312     {
313       count += val.size();
314     }
315     Assert.assertEquals(count, _PARTITIONS * 3);
316     
317    
318   }
319   
320   @Test()
321   public void TestSchedulerMsg() throws Exception
322   {
323     Logger.getRootLogger().setLevel(Level.INFO);
324     _factory._results.clear();
325     HelixManager manager = null;
326     for (int i = 0; i < NODE_NR; i++)
327     {
328       String hostDest = "localhost_" + (START_PORT + i);
329       _startCMResultMap.get(hostDest)._manager.getMessagingService()
330           .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
331       manager = _startCMResultMap.get(hostDest)._manager;
332     }
333 
334     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
335         .randomUUID().toString());
336     schedulerMessage.setTgtSessionId("*");
337     schedulerMessage.setTgtName("CONTROLLER");
338     // TODO: change it to "ADMIN" ?
339     schedulerMessage.setSrcName("CONTROLLER");
340     //schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg");
341     // Template for the individual message sent to each participant
342     Message msg = new Message(_factory.getMessageType(), "Template");
343     msg.setTgtSessionId("*");
344     msg.setMsgState(MessageState.NEW);
345 
346     // Criteria to send individual messages
347     Criteria cr = new Criteria();
348     cr.setInstanceName("localhost_%");
349     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
350     cr.setSessionSpecific(false);
351     cr.setResource("%");
352     cr.setPartition("%");
353 
354     ObjectMapper mapper = new ObjectMapper();
355     SerializationConfig serializationConfig = mapper.getSerializationConfig();
356     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
357 
358     StringWriter sw = new StringWriter();
359     mapper.writeValue(sw, cr);
360 
361     String crString = sw.toString();
362 
363     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
364     schedulerMessage.getRecord().setMapField("MessageTemplate",
365         msg.getRecord().getSimpleFields());
366     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
367 
368     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
369     Builder keyBuilder = helixDataAccessor.keyBuilder();
370     helixDataAccessor.createProperty(
371         keyBuilder.controllerMessage(schedulerMessage.getMsgId()),
372         schedulerMessage);
373     
374     for(int i = 0; i < 30; i++)
375     {
376       Thread.sleep(2000);
377       if(_PARTITIONS == _factory._results.size())
378       {
379         break;
380       }
381     }
382 
383     Assert.assertEquals(_PARTITIONS, _factory._results.size());
384     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
385         MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
386       
387     int messageResultCount = 0;
388     for(int i = 0; i < 10; i++)
389     {
390       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
391           .getRecord();
392       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
393           .get("MessageCount").equals("" + (_PARTITIONS * 3)));
394       for(String key : statusUpdate.getMapFields().keySet())
395       {
396         if(key.startsWith("MessageResult "))
397         {
398           messageResultCount ++;
399           Assert.assertTrue(statusUpdate.getMapField(key).size() > 1);
400         }
401       }
402       if(messageResultCount == _PARTITIONS * 3)
403       {
404         break;
405       }
406       else
407       {
408         Thread.sleep(2000);
409       }
410     }
411     Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
412     int count = 0;
413     for (Set<String> val : _factory._results.values())
414     {
415       count += val.size();
416     }
417     Assert.assertEquals(count, _PARTITIONS * 3);
418     
419     // test the ZkPathDataDumpTask
420     String controllerStatusPath = HelixUtil.getControllerPropertyPath(manager.getClusterName(),
421         PropertyType.STATUSUPDATES_CONTROLLER);
422     List<String> subPaths = _zkClient.getChildren(controllerStatusPath);
423     Assert.assertTrue(subPaths.size() > 0);
424     for(String subPath : subPaths)
425     {
426       String nextPath = controllerStatusPath + "/" + subPath;
427       List<String> subsubPaths = _zkClient.getChildren(nextPath);
428       Assert.assertTrue(subsubPaths.size() > 0);
429     }
430     
431     String instanceStatusPath = HelixUtil.getInstancePropertyPath(manager.getClusterName(), "localhost_" + (START_PORT),
432         PropertyType.STATUSUPDATES);
433     
434     subPaths = _zkClient.getChildren(instanceStatusPath);
435     Assert.assertTrue(subPaths.size() > 0);
436     for(String subPath : subPaths)
437     {
438       String nextPath = instanceStatusPath + "/" + subPath;
439       List<String> subsubPaths = _zkClient.getChildren(nextPath);
440       Assert.assertTrue(subsubPaths.size() > 0);
441       for(String subsubPath : subsubPaths)
442       {
443         String nextnextPath = nextPath + "/" + subsubPath;
444         Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() > 0);
445       }
446     }
447     Thread.sleep(3000);
448     ZKPathDataDumpTask dumpTask = new ZKPathDataDumpTask(manager, _zkClient, 0);
449     dumpTask.run();
450     
451     subPaths = _zkClient.getChildren(controllerStatusPath);
452     Assert.assertTrue(subPaths.size() > 0);
453     for(String subPath : subPaths)
454     {
455       String nextPath = controllerStatusPath + "/" + subPath;
456       List<String> subsubPaths = _zkClient.getChildren(nextPath);
457       Assert.assertTrue(subsubPaths.size() == 0);
458     }
459     
460     subPaths = _zkClient.getChildren(instanceStatusPath);
461     Assert.assertTrue(subPaths.size() > 0);
462     for(String subPath : subPaths)
463     {
464       String nextPath = instanceStatusPath + "/" + subPath;
465       List<String> subsubPaths = _zkClient.getChildren(nextPath);
466       Assert.assertTrue(subsubPaths.size() > 0);
467       for(String subsubPath : subsubPaths)
468       {
469         String nextnextPath = nextPath + "/" + subsubPath;
470         Assert.assertTrue(_zkClient.getChildren(nextnextPath).size() == 0);
471       }
472     }
473   }
474   
475 
476   @Test()
477   public void TestSchedulerMsg2() throws Exception
478   {
479     _factory._results.clear();
480     HelixManager manager = null;
481     for (int i = 0; i < NODE_NR; i++)
482     {
483       String hostDest = "localhost_" + (START_PORT + i);
484       _startCMResultMap.get(hostDest)._manager.getMessagingService()
485           .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
486       manager = _startCMResultMap.get(hostDest)._manager;
487     }
488 
489     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
490         .randomUUID().toString());
491     schedulerMessage.setTgtSessionId("*");
492     schedulerMessage.setTgtName("CONTROLLER");
493     // TODO: change it to "ADMIN" ?
494     schedulerMessage.setSrcName("CONTROLLER");
495 
496     // Template for the individual message sent to each participant
497     Message msg = new Message(_factory.getMessageType(), "Template");
498     msg.setTgtSessionId("*");
499     msg.setMsgState(MessageState.NEW);
500 
501     // Criteria to send individual messages
502     Criteria cr = new Criteria();
503     cr.setInstanceName("localhost_%");
504     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
505     cr.setSessionSpecific(false);
506     cr.setResource("%");
507     cr.setPartition("%");
508 
509     ObjectMapper mapper = new ObjectMapper();
510     SerializationConfig serializationConfig = mapper.getSerializationConfig();
511     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
512 
513     StringWriter sw = new StringWriter();
514     mapper.writeValue(sw, cr);
515 
516     String crString = sw.toString();
517 
518     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
519     schedulerMessage.getRecord().setMapField("MessageTemplate",
520         msg.getRecord().getSimpleFields());
521     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
522     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
523     
524     Criteria cr2 = new Criteria();
525     cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
526     cr2.setInstanceName("*");
527     cr2.setSessionSpecific(false);
528 
529     schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2");
530     MockAsyncCallback callback = new MockAsyncCallback();
531     manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
532     String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
533     
534     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
535     Builder keyBuilder = helixDataAccessor.keyBuilder();
536     for(int i = 0;i < 10; i++ )
537     {
538       Thread.sleep(200);
539       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
540           MessageType.SCHEDULER_MSG.toString(), msgId);
541       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
542           .getRecord();
543       if(statusUpdate.getMapFields().containsKey("Summary"))
544       {
545         break;
546       }
547     }
548     
549     Assert.assertEquals(_PARTITIONS, _factory._results.size());
550     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
551         MessageType.SCHEDULER_MSG.toString(), msgId);
552     ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
553         .getRecord();
554     Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
555         .get("MessageCount").equals("" + (_PARTITIONS * 3)));
556     int messageResultCount = 0;
557     for(String key : statusUpdate.getMapFields().keySet())
558     {
559       if(key.startsWith("MessageResult "))
560       {
561         messageResultCount ++;
562       }
563     }
564     Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
565     
566     int count = 0;
567     for (Set<String> val : _factory._results.values())
568     {
569       count += val.size();
570     }
571     Assert.assertEquals(count, _PARTITIONS * 3);
572   }
573 
574   @Test()
575   public void TestSchedulerZeroMsg() throws Exception
576   {
577     TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory();
578     HelixManager manager = null;
579     for (int i = 0; i < NODE_NR; i++)
580     {
581       String hostDest = "localhost_" + (START_PORT + i);
582       _startCMResultMap.get(hostDest)._manager.getMessagingService()
583           .registerMessageHandlerFactory(factory.getMessageType(), factory);
584       manager = _startCMResultMap.get(hostDest)._manager;
585     }
586 
587     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
588         .randomUUID().toString());
589     schedulerMessage.setTgtSessionId("*");
590     schedulerMessage.setTgtName("CONTROLLER");
591     // TODO: change it to "ADMIN" ?
592     schedulerMessage.setSrcName("CONTROLLER");
593 
594     // Template for the individual message sent to each participant
595     Message msg = new Message(factory.getMessageType(), "Template");
596     msg.setTgtSessionId("*");
597     msg.setMsgState(MessageState.NEW);
598 
599     // Criteria to send individual messages
600     Criteria cr = new Criteria();
601     cr.setInstanceName("localhost_DOESNOTEXIST");
602     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
603     cr.setSessionSpecific(false);
604     cr.setResource("%");
605     cr.setPartition("%");
606 
607     ObjectMapper mapper = new ObjectMapper();
608     SerializationConfig serializationConfig = mapper.getSerializationConfig();
609     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
610 
611     StringWriter sw = new StringWriter();
612     mapper.writeValue(sw, cr);
613 
614     String crString = sw.toString();
615 
616     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
617     schedulerMessage.getRecord().setMapField("MessageTemplate",
618         msg.getRecord().getSimpleFields());
619     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
620 
621     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
622     Builder keyBuilder = helixDataAccessor.keyBuilder();
623     PropertyKey controllerMessageKey = keyBuilder
624         .controllerMessage(schedulerMessage.getMsgId());
625     helixDataAccessor.setProperty(controllerMessageKey, schedulerMessage);
626 
627     Thread.sleep(3000);
628 
629     Assert.assertEquals(0, factory._results.size());
630     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
631         MessageType.SCHEDULER_MSG.toString(), schedulerMessage.getMsgId());
632     for(int i = 0; i< 10; i++)
633     {
634       StatusUpdate update = helixDataAccessor.getProperty(controllerTaskStatus);
635       if(update == null || update.getRecord().getMapField("SentMessageCount") == null)
636       {
637         Thread.sleep(1000);
638       }
639     }
640     ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
641         .getRecord();
642     Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
643         .get("MessageCount").equals("0"));
644     int count = 0;
645     for (Set<String> val : factory._results.values())
646     {
647       count += val.size();
648     }
649     Assert.assertEquals(count, 0);
650   }
651   
652 
653   @Test()
654   public void TestSchedulerMsg3() throws Exception
655   {
656     _factory._results.clear();
657     HelixManager manager = null;
658     for (int i = 0; i < NODE_NR; i++)
659     {
660       String hostDest = "localhost_" + (START_PORT + i);
661       _startCMResultMap.get(hostDest)._manager.getMessagingService()
662           .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
663       // 
664       _startCMResultMap.get(hostDest)._manager.getMessagingService()
665       .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
666       manager = _startCMResultMap.get(hostDest)._manager;
667     }
668 
669     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
670         .randomUUID().toString());
671     schedulerMessage.setTgtSessionId("*");
672     schedulerMessage.setTgtName("CONTROLLER");
673     // TODO: change it to "ADMIN" ?
674     schedulerMessage.setSrcName("CONTROLLER");
675 
676     // Template for the individual message sent to each participant
677     Message msg = new Message(_factory.getMessageType(), "Template");
678     msg.setTgtSessionId("*");
679     msg.setMsgState(MessageState.NEW);
680 
681     // Criteria to send individual messages
682     Criteria cr = new Criteria();
683     cr.setInstanceName("localhost_%");
684     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
685     cr.setSessionSpecific(false);
686     cr.setResource("%");
687     cr.setPartition("%");
688 
689     ObjectMapper mapper = new ObjectMapper();
690     SerializationConfig serializationConfig = mapper.getSerializationConfig();
691     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
692 
693     StringWriter sw = new StringWriter();
694     mapper.writeValue(sw, cr);
695 
696     String crString = sw.toString();
697 
698     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
699     schedulerMessage.getRecord().setMapField("MessageTemplate",
700         msg.getRecord().getSimpleFields());
701     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
702     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
703 
704     schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg3");
705     Criteria cr2 = new Criteria();
706     cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
707     cr2.setInstanceName("*");
708     cr2.setSessionSpecific(false);
709     
710     MockAsyncCallback callback = new MockAsyncCallback();
711     cr.setInstanceName("localhost_%");
712     mapper = new ObjectMapper();
713     serializationConfig = mapper.getSerializationConfig();
714     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
715 
716     sw = new StringWriter();
717     mapper.writeValue(sw, cr);
718 
719     crString = sw.toString();
720     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
721     
722     for(int i = 0; i < 4; i++)
723     {
724       callback = new MockAsyncCallback();
725       cr.setInstanceName("localhost_"+(START_PORT + i));
726       mapper = new ObjectMapper();
727       serializationConfig = mapper.getSerializationConfig();
728       serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
729 
730       sw = new StringWriter();
731       mapper.writeValue(sw, cr);
732       schedulerMessage.setMsgId(UUID.randomUUID().toString());
733       crString = sw.toString();
734       schedulerMessage.getRecord().setSimpleField("Criteria", crString);
735       manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
736       String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
737       
738       HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
739       Builder keyBuilder = helixDataAccessor.keyBuilder();
740       
741       for(int j = 0;j < 100; j++ )
742       {
743         Thread.sleep(200);
744         PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
745             MessageType.SCHEDULER_MSG.toString(), msgId);
746         ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
747             .getRecord();
748         if(statusUpdate.getMapFields().containsKey("Summary"))
749         {
750           break;
751         }
752       }
753       
754       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
755           MessageType.SCHEDULER_MSG.toString(), msgId);
756       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
757           .getRecord();
758       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
759           .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
760       int messageResultCount = 0;
761       for(String key : statusUpdate.getMapFields().keySet())
762       {
763         if(key.startsWith("MessageResult"))
764         {
765           messageResultCount ++;
766         }
767       }
768       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
769       
770       int count = 0;
771       //System.out.println(i);
772       for (Set<String> val : _factory._results.values())
773       {
774         //System.out.println(val);
775         count += val.size();
776       }
777       //System.out.println(count);
778       Assert.assertEquals(count, _PARTITIONS * 3/ 5 *(i+1) );
779     }
780   }
781   
782 
783   @Test()
784   public void TestSchedulerMsg4() throws Exception
785   {
786     _factory._results.clear();
787     HelixManager manager = null;
788     for (int i = 0; i < NODE_NR; i++)
789     {
790       String hostDest = "localhost_" + (START_PORT + i);
791       _startCMResultMap.get(hostDest)._manager.getMessagingService()
792           .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
793       // 
794       _startCMResultMap.get(hostDest)._manager.getMessagingService()
795       .registerMessageHandlerFactory(_factory.getMessageType(), _factory);
796       manager = _startCMResultMap.get(hostDest)._manager;
797     }
798 
799     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
800         .randomUUID().toString());
801     schedulerMessage.setTgtSessionId("*");
802     schedulerMessage.setTgtName("CONTROLLER");
803     // TODO: change it to "ADMIN" ?
804     schedulerMessage.setSrcName("CONTROLLER");
805 
806     // Template for the individual message sent to each participant
807     Message msg = new Message(_factory.getMessageType(), "Template");
808     msg.setTgtSessionId("*");
809     msg.setMsgState(MessageState.NEW);
810 
811     // Criteria to send individual messages
812     Criteria cr = new Criteria();
813     cr.setInstanceName("localhost_%");
814     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
815     cr.setSessionSpecific(false);
816     cr.setResource("TestDB");
817     cr.setPartition("%");
818 
819     ObjectMapper mapper = new ObjectMapper();
820     SerializationConfig serializationConfig = mapper.getSerializationConfig();
821     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
822 
823     StringWriter sw = new StringWriter();
824     mapper.writeValue(sw, cr);
825 
826     String crString = sw.toString();
827 
828     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
829     schedulerMessage.getRecord().setMapField("MessageTemplate",
830         msg.getRecord().getSimpleFields());
831     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
832     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
833 
834     schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg4");
835     Criteria cr2 = new Criteria();
836     cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
837     cr2.setInstanceName("*");
838     cr2.setSessionSpecific(false);
839     
840     Map<String, String> constraints = new TreeMap<String, String>();
841     constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
842     constraints.put("TRANSITION", "OFFLINE-COMPLETED");
843     constraints.put("CONSTRAINT_VALUE", "1");
844     constraints.put("INSTANCE", ".*");
845     manager.getClusterManagmentTool().setConstraint(manager.getClusterName(), 
846                                                     ConstraintType.MESSAGE_CONSTRAINT, 
847                                                     "constraint1", 
848                                                     new ConstraintItem(constraints));
849     
850     MockAsyncCallback callback = new MockAsyncCallback();
851     cr.setInstanceName("localhost_%");
852     mapper = new ObjectMapper();
853     serializationConfig = mapper.getSerializationConfig();
854     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
855 
856     sw = new StringWriter();
857     mapper.writeValue(sw, cr);
858 
859     crString = sw.toString();
860     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
861     manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
862     String msgIdPrime = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
863 
864     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
865     Builder keyBuilder = helixDataAccessor.keyBuilder();
866     ArrayList<String> msgIds = new ArrayList<String>();
867     for(int i = 0; i < NODE_NR; i++)
868     {
869       callback = new MockAsyncCallback();
870       cr.setInstanceName("localhost_"+(START_PORT + i));
871       mapper = new ObjectMapper();
872       serializationConfig = mapper.getSerializationConfig();
873       serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
874 
875       sw = new StringWriter();
876       mapper.writeValue(sw, cr);
877       schedulerMessage.setMsgId(UUID.randomUUID().toString());
878       crString = sw.toString();
879       schedulerMessage.getRecord().setSimpleField("Criteria", crString);
880       manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
881       String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
882       msgIds.add(msgId);
883     }
884     for(int i = 0; i < NODE_NR; i++)
885     {
886       String msgId = msgIds.get(i);
887       for(int j = 0;j < 100; j++ )
888       {
889         Thread.sleep(200);
890         PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
891             MessageType.SCHEDULER_MSG.toString(), msgId);
892         ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
893             .getRecord();
894         if(statusUpdate.getMapFields().containsKey("Summary"))
895         {
896           //System.err.println(msgId+" done");
897           break;
898         }
899       }
900       
901       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
902           MessageType.SCHEDULER_MSG.toString(), msgId);
903       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
904           .getRecord();
905       Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
906           .get("MessageCount").equals("" + (_PARTITIONS * 3 / 5)));
907       int messageResultCount = 0;
908       for(String key : statusUpdate.getMapFields().keySet())
909       {
910         if(key.startsWith("MessageResult"))
911         {
912           messageResultCount ++;
913         }
914       }
915       if(messageResultCount != _PARTITIONS * 3 / 5)
916       {
917         int x = 10;
918         x = x + messageResultCount;
919       }
920       Assert.assertEquals(messageResultCount, _PARTITIONS * 3 / 5);
921     }
922     
923     for(int j = 0;j < 100; j++ )
924     {
925       Thread.sleep(200);
926       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
927           MessageType.SCHEDULER_MSG.toString(), msgIdPrime);
928       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
929           .getRecord();
930       if(statusUpdate.getMapFields().containsKey("Summary"))
931       {
932         break;
933       }
934     }
935     int count = 0;
936     for (Set<String> val : _factory._results.values())
937     {
938       //System.out.println(val);
939       count += val.size();
940     }
941     //System.out.println(count);
942     Assert.assertEquals(count, _PARTITIONS * 3 * 2 );
943   }
944   
945   
946   @Test
947   public void TestSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, IOException, InterruptedException
948   {
949     TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch();
950     HelixManager manager = null;
951     for (int i = 0; i < NODE_NR; i++)
952     {
953       String hostDest = "localhost_" + (START_PORT + i);
954       _startCMResultMap.get(hostDest)._manager.getMessagingService()
955           .registerMessageHandlerFactory(factory.getMessageType(), factory);
956       // 
957       _startCMResultMap.get(hostDest)._manager.getMessagingService()
958       .registerMessageHandlerFactory(factory.getMessageType(), factory);
959       manager = _startCMResultMap.get(hostDest)._manager;
960     }
961 
962     Message schedulerMessage = new Message(MessageType.SCHEDULER_MSG + "", UUID
963         .randomUUID().toString());
964     schedulerMessage.setTgtSessionId("*");
965     schedulerMessage.setTgtName("CONTROLLER");
966     // TODO: change it to "ADMIN" ?
967     schedulerMessage.setSrcName("CONTROLLER");
968 
969     // Template for the individual message sent to each participant
970     Message msg = new Message(factory.getMessageType(), "Template");
971     msg.setTgtSessionId("*");
972     msg.setMsgState(MessageState.NEW);
973 
974     // Criteria to send individual messages
975     Criteria cr = new Criteria();
976     cr.setInstanceName("localhost_%");
977     cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
978     cr.setSessionSpecific(false);
979     cr.setResource("%");
980     cr.setPartition("%");
981 
982     ObjectMapper mapper = new ObjectMapper();
983     SerializationConfig serializationConfig = mapper.getSerializationConfig();
984     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
985 
986     StringWriter sw = new StringWriter();
987     mapper.writeValue(sw, cr);
988 
989     String crString = sw.toString();
990 
991     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
992     schedulerMessage.getRecord().setMapField("MessageTemplate",
993         msg.getRecord().getSimpleFields());
994     schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1");
995     schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true");
996     schedulerMessage.getRecord().setSimpleField(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints");
997     
998     Criteria cr2 = new Criteria();
999     cr2.setRecipientInstanceType(InstanceType.CONTROLLER);
1000     cr2.setInstanceName("*");
1001     cr2.setSessionSpecific(false);
1002     
1003     
1004     MockAsyncCallback callback = new MockAsyncCallback();
1005     mapper = new ObjectMapper();
1006     serializationConfig = mapper.getSerializationConfig();
1007     serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
1008 
1009     sw = new StringWriter();
1010     mapper.writeValue(sw, cr);
1011 
1012     
1013     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
1014     Builder keyBuilder = helixDataAccessor.keyBuilder();
1015     
1016     // Set contraints that only 1 msg per participant
1017     Map<String, String> constraints = new TreeMap<String, String>();
1018     constraints.put("MESSAGE_TYPE", "STATE_TRANSITION");
1019     constraints.put("TRANSITION", "OFFLINE-COMPLETED");
1020     constraints.put("CONSTRAINT_VALUE", "1");
1021     constraints.put("INSTANCE", ".*");
1022     manager.getClusterManagmentTool().setConstraint(manager.getClusterName(), 
1023                                                     ConstraintType.MESSAGE_CONSTRAINT,
1024                                                     "constraint1", 
1025                                                     new ConstraintItem(constraints));
1026     
1027     // Send scheduler message
1028     crString = sw.toString();
1029     schedulerMessage.getRecord().setSimpleField("Criteria", crString);
1030     manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1);
1031     String msgId = callback._message.getResultMap().get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID);
1032     
1033     for(int j = 0;j < 10; j++ )
1034     {
1035       Thread.sleep(200);
1036       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1037           MessageType.SCHEDULER_MSG.toString(), msgId);
1038       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1039           .getRecord();
1040       if(statusUpdate.getMapFields().containsKey("SentMessageCount"))
1041       {
1042         Assert.assertEquals(statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), ""+(_PARTITIONS * 3));
1043         break;
1044       }
1045     }
1046     
1047     for(int i = 0; i < _PARTITIONS * 3 / 5; i++)
1048     {
1049       for(int j = 0; j< 10; j++)
1050       {
1051         Thread.sleep(300);
1052         if(factory._messageCount == 5*(i+1)) break;
1053       }
1054       Thread.sleep(300);
1055       Assert.assertEquals(factory._messageCount, 5*(i+1));
1056       factory.signal();
1057       //System.err.println(i);
1058     }
1059     
1060     for(int j = 0;j < 10; j++ )
1061     {
1062       Thread.sleep(200);
1063       PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1064           MessageType.SCHEDULER_MSG.toString(), msgId);
1065       ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1066           .getRecord();
1067       if(statusUpdate.getMapFields().containsKey("Summary"))
1068       {
1069         break;
1070       }
1071     }
1072 
1073     Assert.assertEquals(_PARTITIONS, factory._results.size());
1074     PropertyKey controllerTaskStatus = keyBuilder.controllerTaskStatus(
1075         MessageType.SCHEDULER_MSG.toString(), msgId);
1076     ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus)
1077         .getRecord();
1078     Assert.assertTrue(statusUpdate.getMapField("SentMessageCount")
1079         .get("MessageCount").equals("" + (_PARTITIONS * 3)));
1080     int messageResultCount = 0;
1081     for(String key : statusUpdate.getMapFields().keySet())
1082     {
1083       if(key.startsWith("MessageResult "))
1084       {
1085         messageResultCount ++;
1086       }
1087     }
1088     Assert.assertEquals(messageResultCount, _PARTITIONS * 3);
1089     
1090     int count = 0;
1091     for (Set<String> val : factory._results.values())
1092     {
1093       count += val.size();
1094     }
1095     Assert.assertEquals(count, _PARTITIONS * 3);
1096     
1097     manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(), 
1098                                                     ConstraintType.MESSAGE_CONSTRAINT,
1099                                                     "constraint1");
1100     
1101   }
1102 }