View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.helix.InstanceType;
26  import org.apache.helix.NotificationContext;
27  import org.apache.helix.TestHelper;
28  import org.apache.helix.ZkHelixTestManager;
29  import org.apache.helix.ZkTestHelper;
30  import org.apache.helix.mock.controller.ClusterController;
31  import org.apache.helix.mock.participant.MockParticipant;
32  import org.apache.helix.mock.participant.MockTransition;
33  import org.apache.helix.model.Message;
34  import org.apache.helix.tools.ClusterStateVerifier;
35  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36  import org.apache.log4j.Level;
37  import org.apache.log4j.Logger;
38  import org.testng.Assert;
39  
40  
41  public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
42  {
43  
44    public class SessionExpiryTransition extends MockTransition
45    {
46      private final AtomicBoolean _done = new AtomicBoolean();
47  
48      @Override
49      public void doTransition(Message message, NotificationContext context)
50      {
51        ZkHelixTestManager manager = (ZkHelixTestManager)context.getManager();
52       
53        String instance = message.getTgtName();
54        String partition = message.getPartitionName();
55        if (instance.equals("localhost_12918")
56            && partition.equals("TestDB0_1")  // TestDB0_1 is SLAVE on localhost_12918
57            && _done.getAndSet(true) == false)
58        {
59          try
60          {
61            ZkTestHelper.expireSession(manager.getZkClient());
62          }
63          catch (Exception e)
64          {
65            // TODO Auto-generated catch block
66            e.printStackTrace();
67          }
68        }
69      }
70    }
71   
72    // TODO: disable test first until we have a clean design in handling zk disconnect/session-expiry
73    // when there is pending messages
74    // @Test
75    public void testSessionExpiryInTransition() throws Exception
76    {
77      Logger.getRootLogger().setLevel(Level.WARN);
78  
79      String className = TestHelper.getTestClassName();
80      String methodName = TestHelper.getTestMethodName();
81      final String clusterName = className + "_" + methodName;
82  
83      System.out.println("START " + clusterName + " at "
84          + new Date(System.currentTimeMillis()));
85  
86      MockParticipant[] participants = new MockParticipant[5];
87  
88      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
89                              "localhost", // participant name prefix
90                              "TestDB", // resource name prefix
91                              1, // resources
92                              10, // partitions per resource
93                              5, // number of nodes
94                              3, // replicas
95                              "MasterSlave",
96                              true); // do rebalance
97  
98      // start controller
99      ClusterController controller =
100         new ClusterController(clusterName, "controller_0", ZK_ADDR);
101     controller.syncStart();
102 
103     // start participants
104     for (int i = 0; i < 5; i++)
105     {
106       String instanceName = "localhost_" + (12918 + i);
107       ZkHelixTestManager manager =
108           new ZkHelixTestManager(clusterName,
109                                  instanceName,
110                                  InstanceType.PARTICIPANT,
111                                  ZK_ADDR);
112       participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
113       participants[i].syncStart();
114     }
115 
116     boolean result =
117         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
118                                                                                  clusterName));
119     Assert.assertTrue(result);
120 
121     // clean up
122     for (int i = 0; i < 5; i++)
123     {
124       participants[i].syncStop();
125     }
126 
127     Thread.sleep(2000);
128     controller.syncStop();
129 
130     System.out.println("END " + clusterName + " at "
131         + new Date(System.currentTimeMillis()));
132 
133   }
134 }