1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.helix.InstanceType;
26 import org.apache.helix.NotificationContext;
27 import org.apache.helix.TestHelper;
28 import org.apache.helix.ZkHelixTestManager;
29 import org.apache.helix.ZkTestHelper;
30 import org.apache.helix.mock.controller.ClusterController;
31 import org.apache.helix.mock.participant.MockParticipant;
32 import org.apache.helix.mock.participant.MockTransition;
33 import org.apache.helix.model.Message;
34 import org.apache.helix.tools.ClusterStateVerifier;
35 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
36 import org.apache.log4j.Level;
37 import org.apache.log4j.Logger;
38 import org.testng.Assert;
39
40
41 public class TestSessionExpiryInTransition extends ZkIntegrationTestBase
42 {
43
44 public class SessionExpiryTransition extends MockTransition
45 {
46 private final AtomicBoolean _done = new AtomicBoolean();
47
48 @Override
49 public void doTransition(Message message, NotificationContext context)
50 {
51 ZkHelixTestManager manager = (ZkHelixTestManager)context.getManager();
52
53 String instance = message.getTgtName();
54 String partition = message.getPartitionName();
55 if (instance.equals("localhost_12918")
56 && partition.equals("TestDB0_1")
57 && _done.getAndSet(true) == false)
58 {
59 try
60 {
61 ZkTestHelper.expireSession(manager.getZkClient());
62 }
63 catch (Exception e)
64 {
65
66 e.printStackTrace();
67 }
68 }
69 }
70 }
71
72
73
74
75 public void testSessionExpiryInTransition() throws Exception
76 {
77 Logger.getRootLogger().setLevel(Level.WARN);
78
79 String className = TestHelper.getTestClassName();
80 String methodName = TestHelper.getTestMethodName();
81 final String clusterName = className + "_" + methodName;
82
83 System.out.println("START " + clusterName + " at "
84 + new Date(System.currentTimeMillis()));
85
86 MockParticipant[] participants = new MockParticipant[5];
87
88 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
89 "localhost",
90 "TestDB",
91 1,
92 10,
93 5,
94 3,
95 "MasterSlave",
96 true);
97
98
99 ClusterController controller =
100 new ClusterController(clusterName, "controller_0", ZK_ADDR);
101 controller.syncStart();
102
103
104 for (int i = 0; i < 5; i++)
105 {
106 String instanceName = "localhost_" + (12918 + i);
107 ZkHelixTestManager manager =
108 new ZkHelixTestManager(clusterName,
109 instanceName,
110 InstanceType.PARTICIPANT,
111 ZK_ADDR);
112 participants[i] = new MockParticipant(manager, new SessionExpiryTransition());
113 participants[i].syncStart();
114 }
115
116 boolean result =
117 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
118 clusterName));
119 Assert.assertTrue(result);
120
121
122 for (int i = 0; i < 5; i++)
123 {
124 participants[i].syncStop();
125 }
126
127 Thread.sleep(2000);
128 controller.syncStop();
129
130 System.out.println("END " + clusterName + " at "
131 + new Date(System.currentTimeMillis()));
132
133 }
134 }