1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23
24 import org.apache.helix.HelixDataAccessor;
25 import org.apache.helix.TestHelper;
26 import org.apache.helix.ZNRecord;
27 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
28 import org.apache.helix.manager.zk.ZNRecordSerializer;
29 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30 import org.apache.helix.manager.zk.ZkClient;
31 import org.apache.helix.mock.controller.ClusterController;
32 import org.apache.helix.mock.participant.MockParticipant;
33 import org.apache.helix.model.PauseSignal;
34 import org.apache.helix.tools.ClusterSetup;
35 import org.apache.helix.tools.ClusterStateVerifier;
36 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
37 import org.testng.Assert;
38 import org.testng.annotations.Test;
39
40
41 public class TestPauseSignal extends ZkIntegrationTestBase
42 {
43 @Test()
44 public void testPauseSignal() throws Exception
45 {
46
47 String className = TestHelper.getTestClassName();
48 String methodName = TestHelper.getTestMethodName();
49 final String clusterName = className + "_" + methodName;
50
51 System.out.println("START " + clusterName + " at "
52 + new Date(System.currentTimeMillis()));
53
54 MockParticipant[] participants = new MockParticipant[5];
55
56 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
57 "localhost",
58 "TestDB",
59 1,
60 10,
61 5,
62 3,
63 "MasterSlave",
64 true);
65
66
67 ClusterController controller =
68 new ClusterController(clusterName, "controller_0", ZK_ADDR);
69 controller.syncStart();
70
71
72 for (int i = 0; i < 5; i++)
73 {
74 String instanceName = "localhost_" + (12918 + i);
75
76 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
77 participants[i].syncStart();
78 }
79
80 boolean result =
81 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
82 clusterName));
83 Assert.assertTrue(result);
84
85
86 ZkClient zkClient = new ZkClient(ZK_ADDR);
87 zkClient.setZkSerializer(new ZNRecordSerializer());
88 final HelixDataAccessor tmpAccessor =
89 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
90
91 String cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " false";
92 ClusterSetup.processCommandLineArgs(cmd.split(" "));
93
94 tmpAccessor.setProperty(tmpAccessor.keyBuilder().pause(), new PauseSignal("pause"));
95 zkClient.close();
96
97
98 Thread.sleep(1000);
99
100
101 ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
102 setupTool.addResourceToCluster(clusterName, "TestDB1", 10, "MasterSlave");
103 setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
104
105
106 TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
107 1000,
108 clusterName,
109 "TestDB1",
110 TestHelper.<String> setOf("localhost_12918",
111 "localhost_12919",
112 "localhost_12920",
113 "localhost_12921",
114 "localhost_12922"),
115 ZK_ADDR);
116
117
118 final HelixDataAccessor accessor =
119 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
120
121 cmd = "-zkSvr " + ZK_ADDR + " -enableCluster " + clusterName + " true";
122 ClusterSetup.processCommandLineArgs(cmd.split(" "));
123 result =
124 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
125 clusterName));
126 Assert.assertTrue(result);
127
128
129 for (int i = 0; i < 5; i++)
130 {
131 participants[i].syncStop();
132 }
133
134 Thread.sleep(2000);
135 controller.syncStop();
136
137 System.out.println("END " + clusterName + " at "
138 + new Date(System.currentTimeMillis()));
139 }
140 }