View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.UUID;
23  
24  import org.apache.helix.Criteria;
25  import org.apache.helix.InstanceType;
26  import org.apache.helix.PropertyKey.Builder;
27  import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory;
28  import org.apache.helix.manager.zk.DefaultParticipantErrorMessageHandlerFactory.ActionOnError;
29  import org.apache.helix.model.ExternalView;
30  import org.apache.helix.model.Message;
31  import org.apache.helix.model.Message.MessageType;
32  import org.apache.helix.tools.ClusterStateVerifier;
33  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
34  import org.testng.Assert;
35  import org.testng.annotations.Test;
36  
37  
38  public class TestParticipantErrorMessage extends ZkStandAloneCMTestBase
39  {
40    @Test()
41    public void TestParticipantErrorMessageSend()
42    {
43      String participant1 = "localhost_" + START_PORT;
44      String participant2 = "localhost_" + (START_PORT + 1);
45      
46      Message errorMessage1
47        = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
48      errorMessage1.setTgtSessionId("*");
49      errorMessage1.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_INSTANCE.toString());
50      Criteria recipientCriteria = new Criteria();
51      recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
52      recipientCriteria.setSessionSpecific(false);
53      _startCMResultMap.get(participant1)._manager.getMessagingService().send(recipientCriteria, errorMessage1);
54      
55      Message errorMessage2
56      = new Message(MessageType.PARTICIPANT_ERROR_REPORT, UUID.randomUUID().toString());
57      errorMessage2.setTgtSessionId("*");
58      errorMessage2.setResourceName("TestDB");
59      errorMessage2.setPartitionName("TestDB_14");
60      errorMessage2.getRecord().setSimpleField(DefaultParticipantErrorMessageHandlerFactory.ACTIONKEY, ActionOnError.DISABLE_PARTITION.toString());
61      Criteria recipientCriteria2 = new Criteria();
62      recipientCriteria2.setRecipientInstanceType(InstanceType.CONTROLLER);
63      recipientCriteria2.setSessionSpecific(false);
64      _startCMResultMap.get(participant2)._manager.getMessagingService().send(recipientCriteria2, errorMessage2);
65    
66      try
67      {
68        Thread.sleep(1500);
69      }
70      catch (InterruptedException e)
71      {
72        // TODO Auto-generated catch block
73        e.printStackTrace();
74      }
75  
76      boolean result =
77          ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
78                                                                                   CLUSTER_NAME));
79      Assert.assertTrue(result);
80      Builder kb = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().keyBuilder();
81      ExternalView externalView = _startCMResultMap.get(participant2)._manager.getHelixDataAccessor().getProperty(kb.externalView("TestDB"));
82      
83      for(String partitionName : externalView.getRecord().getMapFields().keySet())
84      {
85        for(String hostName :  externalView.getRecord().getMapField(partitionName).keySet())
86        {
87          if(hostName.equals(participant1))
88          {
89            Assert.assertTrue(externalView.getRecord().getMapField(partitionName).get(hostName).equalsIgnoreCase("OFFLINE"));
90          }
91        }
92      }
93      Assert.assertTrue(externalView.getRecord().getMapField("TestDB_14").get(participant2).equalsIgnoreCase("OFFLINE"));
94    }
95  }