1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.HashMap;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.helix.NotificationContext;
29 import org.apache.helix.PropertyKey.Builder;
30 import org.apache.helix.TestHelper;
31 import org.apache.helix.ZNRecord;
32 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
33 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34 import org.apache.helix.mock.controller.ClusterController;
35 import org.apache.helix.mock.participant.MockParticipant;
36 import org.apache.helix.mock.participant.ErrTransition;
37 import org.apache.helix.model.LiveInstance;
38 import org.apache.helix.model.Message;
39 import org.apache.helix.webapp.resources.JsonParameters;
40 import org.apache.log4j.Logger;
41 import org.testng.Assert;
42 import org.testng.annotations.Test;
43
44 public class TestResetPartitionState extends AdminTestBase {
45 private final static Logger LOG = Logger
46 .getLogger(TestResetPartitionState.class);
47
48 String getClusterUrl(String cluster) {
49 return "http://localhost:" + ADMIN_PORT + "/clusters" + "/" + cluster;
50 }
51
52 String getInstanceUrl(String cluster, String instance) {
53 return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster
54 + "/instances/" + instance;
55 }
56
57 String getResourceUrl(String cluster, String resourceGroup) {
58 return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster
59 + "/resourceGroups/" + resourceGroup;
60 }
61
62 AtomicInteger _errToOfflineInvoked = new AtomicInteger(0);
63
64 class ErrTransitionWithResetCnt extends ErrTransition {
65 public ErrTransitionWithResetCnt(Map<String, Set<String>> errPartitions) {
66 super(errPartitions);
67 }
68
69 @Override
70 public void doTransition(Message message, NotificationContext context) {
71 super.doTransition(message, context);
72 String fromState = message.getFromState();
73 String toState = message.getToState();
74 if (fromState.equals("ERROR") && toState.equals("OFFLINE")) {
75
76 _errToOfflineInvoked.incrementAndGet();
77 }
78 }
79 }
80
81 @Test()
82 public void testResetPartitionState() throws Exception {
83 String className = TestHelper.getTestClassName();
84 String methodName = TestHelper.getTestMethodName();
85 String clusterName = className + "_" + methodName;
86 final int n = 5;
87
88 System.out.println("START " + clusterName + " at "
89 + new Date(System.currentTimeMillis()));
90
91 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
92 "localhost",
93 "TestDB",
94 1,
95 10,
96 n,
97 3,
98 "MasterSlave", true);
99
100
101
102
103
104
105 ClusterController controller = new ClusterController(clusterName,
106 "controller_0", ZK_ADDR);
107 controller.syncStart();
108
109 Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
110 errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
111 errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
112
113
114 MockParticipant[] participants = new MockParticipant[n];
115 for (int i = 0; i < n; i++) {
116 String instanceName = "localhost_" + (12918 + i);
117
118 if (i == 0) {
119 participants[i] = new MockParticipant(clusterName,
120 instanceName, ZK_ADDR, new ErrTransition(errPartitions));
121 } else {
122 participants[i] = new MockParticipant(clusterName,
123 instanceName, ZK_ADDR);
124 }
125 participants[i].syncStart();
126 }
127
128
129 Map<String, Map<String, String>> errStateMap = new HashMap<String, Map<String, String>>();
130 errStateMap.put("TestDB0", new HashMap<String, String>());
131 errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
132 errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
133 boolean result = ClusterStateVerifier
134 .verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
135 ZK_ADDR, clusterName, errStateMap)));
136 Assert.assertTrue(result, "Cluster verification fails");
137
138
139 String hostName = "localhost_12918";
140 String instanceUrl = getInstanceUrl(clusterName, hostName);
141
142 Map<String, String> paramMap = new HashMap<String, String>();
143 paramMap.put(JsonParameters.MANAGEMENT_COMMAND,
144 ClusterSetup.resetPartition);
145 paramMap.put(JsonParameters.PARTITION, "TestDB0_nonExist");
146 paramMap.put(JsonParameters.RESOURCE, "TestDB0");
147 LOG.info("IGNORABLE exception: test reset non-exist partition");
148 TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
149 paramMap, true);
150
151
152 errPartitions.clear();
153 participants[0].setTransition(new ErrTransitionWithResetCnt(
154 errPartitions));
155 clearStatusUpdate(clusterName, "localhost_12918", "TestDB0",
156 "TestDB0_4");
157 _errToOfflineInvoked.set(0);
158
159 paramMap.put(JsonParameters.PARTITION, "TestDB0_4 TestDB0_8");
160 TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
161 paramMap, false);
162
163 for (int i = 0; i < 10; i++) {
164 Thread.sleep(400);
165 LOG.info("IGNORABLE exception: test reset non-error partition");
166 TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
167 paramMap, true);
168
169 result = ClusterStateVerifier
170 .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
171 ZK_ADDR, clusterName));
172 if (result == true) {
173 break;
174 }
175 }
176
177 Assert.assertTrue(result);
178 Assert.assertEquals(_errToOfflineInvoked.get(), 2,
179 "reset() should be invoked 2 times");
180
181
182
183 Thread.sleep(1000);
184
185 controller.syncStop();
186 for (int i = 0; i < 5; i++) {
187 participants[i].syncStop();
188 }
189
190 System.out.println("END " + clusterName + " at "
191 + new Date(System.currentTimeMillis()));
192 }
193
194 private void clearStatusUpdate(String clusterName, String instance,
195 String resource, String partition) {
196
197
198
199 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
200 new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
201 Builder keyBuilder = accessor.keyBuilder();
202
203 LiveInstance liveInstance = accessor.getProperty(keyBuilder
204 .liveInstance(instance));
205 accessor.removeProperty(keyBuilder.stateTransitionStatus(instance,
206 liveInstance.getSessionId(), resource, partition));
207
208 }
209
210
211 }