1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.UUID;
23
24 import org.apache.helix.Criteria;
25 import org.apache.helix.InstanceType;
26 import org.apache.helix.PropertyKey.Builder;
27 import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
28 import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
29 import org.apache.helix.model.ExternalView;
30 import org.apache.helix.model.Message;
31 import org.apache.helix.model.Message.MessageType;
32 import org.apache.helix.tools.ClusterStateVerifier;
33 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
34 import org.testng.Assert;
35 import org.testng.annotations.Test;
36
37
38 public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase
39 {
40 @Test()
41 public void TestParticipantErrorMessageSend()
42 {
43 String participant1 = "localhost_" + START_PORT;
44 String participant2 = "localhost_" + (START_PORT + 1);
45
46 Message errorMessage1
47 = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
48 errorMessage1.setTgtSessionId("*");
49 errorMessage1.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_INSTANCE.toString());
50 Criteria recipientCriteria = new Criteria();
51 recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
52 recipientCriteria.setSessionSpecific(false);
53 _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, errorMessage1);
54
55 Message errorMessage2
56 = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
57 errorMessage2.setTgtSessionId("*");
58 errorMessage2.setResourceName("TestDB");
59 errorMessage2.setPartitionName("TestDB_14");
60 errorMessage2.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_PARTITION.toString());
61 Criteria recipientCriteria2 = new Criteria();
62 recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
63 recipientCriteria2.setSessionSpecific(false);
64 _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, errorMessage2);
65
66 try
67 {
68 Thread.sleep(1500);
69 }
70 catch (InterruptedException e)
71 {
72
73 e.printStackTrace();
74 }
75
76 boolean result =
77 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
78 CLUSTER_NAME));
79 Assert.assertTrue(result);
80 Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
81 ExternalView externalView = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
82
83 for(String partitionName : externalView.getRecord().getMapFields().keySet())
84 {
85 for(String hostName : externalView.getRecord().getMapField(partitionName).keySet())
86 {
87 if(hostName.equals(participant1))
88 {
89 Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName).equalsIgnoreCase("OFFLINE"));
90 }
91 }
92 }
93 Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2).equalsIgnoreCase("OFFLINE"));
94 }
95 }