1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23
24 import org.apache.helix.InstanceType;
25 import org.apache.helix.TestHelper;
26 import org.apache.helix.ZkHelixTestManager;
27 import org.apache.helix.ZkTestHelper;
28 import org.apache.helix.mock.participant.MockParticipant;
29 import org.apache.helix.tools.ClusterSetup;
30 import org.apache.helix.tools.ClusterStateVerifier;
31 import org.apache.log4j.Logger;
32 import org.testng.Assert;
33 import org.testng.annotations.Test;
34
35
36 public class TestStandAloneCMSessionExpiry extends ZkIntegrationTestBase
37 {
38 private static Logger LOG = Logger.getLogger(TestStandAloneCMSessionExpiry.class);
39
40 @Test()
41 public void testStandAloneCMSessionExpiry() throws Exception
42 {
43
44 String className = TestHelper.getTestClassName();
45 String methodName = TestHelper.getTestMethodName();
46 String clusterName = className + "_" + methodName;
47
48 System.out.println("START " + clusterName + " at "
49 + new Date(System.currentTimeMillis()));
50
51 TestHelper.setupCluster(clusterName,
52 ZK_ADDR,
53 12918,
54 PARTICIPANT_PREFIX,
55 "TestDB",
56 1,
57 20,
58 5,
59 3,
60 "MasterSlave",
61 true);
62
63 MockParticipant[] participants = new MockParticipant[5];
64 for (int i = 0; i < 5; i++)
65 {
66 String instanceName = "localhost_" + (12918 + i);
67 ZkHelixTestManager manager =
68 new ZkHelixTestManager(clusterName,
69 instanceName,
70 InstanceType.PARTICIPANT,
71 ZK_ADDR);
72 participants[i] = new MockParticipant(manager, null);
73 participants[i].syncStart();
74 }
75
76 ZkHelixTestManager controller =
77 new ZkHelixTestManager(clusterName,
78 "controller_0",
79 InstanceType.CONTROLLER,
80 ZK_ADDR);
81 controller.connect();
82
83 boolean result;
84 result =
85 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
86 clusterName));
87 Assert.assertTrue(result);
88
89
90 ZkHelixTestManager participantToExpire = (ZkHelixTestManager)participants[1].getManager();
91
92 System.out.println("Expire participant session");
93 String oldSessionId = participantToExpire.getSessionId();
94
95 ZkTestHelper.expireSession(participantToExpire.getZkClient());
96 String newSessionId = participantToExpire.getSessionId();
97 System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
98 Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
99
100 ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
101 setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
102 setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
103
104 result =
105 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
106 clusterName));
107 Assert.assertTrue(result);
108
109
110 System.out.println("Expire controller session");
111 oldSessionId = controller.getSessionId();
112 ZkTestHelper.expireSession(controller.getZkClient());
113 newSessionId = controller.getSessionId();
114 System.out.println("oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId);
115 Assert.assertTrue(newSessionId.compareTo(oldSessionId) > 0, "Session id should be increased after expiry");
116
117 setupTool.addResourceToCluster(clusterName, "TestDB2", 8, "MasterSlave");
118 setupTool.rebalanceStorageCluster(clusterName, "TestDB2", 3);
119
120 result =
121 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
122 clusterName));
123 Assert.assertTrue(result);
124
125
126 System.out.println("Clean up ...");
127
128 controller.disconnect();
129 Thread.sleep(100);
130 for (int i = 0; i < 5; i++)
131 {
132 participants[i].syncStop();
133 }
134
135 System.out.println("END " + clusterName + " at "
136 + new Date(System.currentTimeMillis()));
137
138 }
139
140 }