View Javadoc

1   package org.apache.helix.tools;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.Map;
25  import java.util.Set;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.helix.NotificationContext;
29  import org.apache.helix.PropertyKey.Builder;
30  import org.apache.helix.TestHelper;
31  import org.apache.helix.ZNRecord;
32  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
33  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34  import org.apache.helix.mock.controller.ClusterController;
35  import org.apache.helix.mock.participant.MockParticipant;
36  import org.apache.helix.mock.participant.ErrTransition;
37  import org.apache.helix.model.LiveInstance;
38  import org.apache.helix.model.Message;
39  import org.apache.helix.webapp.resources.JsonParameters;
40  import org.apache.log4j.Logger;
41  import org.testng.Assert;
42  import org.testng.annotations.Test;
43  
44  public class TestResetPartitionState extends AdminTestBase {
45      private final static Logger LOG = Logger
46  	    .getLogger(TestResetPartitionState.class);
47  
48      String getClusterUrl(String cluster) {
49  	return "http://localhost:" + ADMIN_PORT + "/clusters" + "/" + cluster;
50      }
51  
52      String getInstanceUrl(String cluster, String instance) {
53  	return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster
54  		+ "/instances/" + instance;
55      }
56  
57      String getResourceUrl(String cluster, String resourceGroup) {
58  	return "http://localhost:" + ADMIN_PORT + "/clusters/" + cluster
59  		+ "/resourceGroups/" + resourceGroup;
60      }
61  
62      AtomicInteger _errToOfflineInvoked = new AtomicInteger(0);
63  
64      class ErrTransitionWithResetCnt extends ErrTransition {
65  	public ErrTransitionWithResetCnt(Map<String, Set<String>> errPartitions) {
66  	    super(errPartitions);
67  	}
68  
69  	@Override
70  	public void doTransition(Message message, NotificationContext context) {
71  	    super.doTransition(message, context);
72  	    String fromState = message.getFromState();
73  	    String toState = message.getToState();
74  	    if (fromState.equals("ERROR") && toState.equals("OFFLINE")) {
75  		// System.err.println("doReset() invoked");
76  		_errToOfflineInvoked.incrementAndGet();
77  	    }
78  	}
79      }
80  
81      @Test()
82      public void testResetPartitionState() throws Exception {
83  	String className = TestHelper.getTestClassName();
84  	String methodName = TestHelper.getTestMethodName();
85  	String clusterName = className + "_" + methodName;
86  	final int n = 5;
87  
88  	System.out.println("START " + clusterName + " at "
89  		+ new Date(System.currentTimeMillis()));
90  
91  	TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
92  		"localhost", // participant name prefix
93  		"TestDB", // resource name prefix
94  		1, // resources
95  		10, // partitions per resource
96  		n, // number of nodes
97  		3, // replicas
98  		"MasterSlave", true); // do rebalance
99  
100 	// start admin thread
101 	// AdminThread adminThread = new AdminThread(ZK_ADDR, _port);
102 	// adminThread.start();
103 
104 	// start controller
105 	ClusterController controller = new ClusterController(clusterName,
106 		"controller_0", ZK_ADDR);
107 	controller.syncStart();
108 
109 	Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>();
110 	errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4"));
111 	errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_8"));
112 
113 	// start mock participants
114 	MockParticipant[] participants = new MockParticipant[n];
115 	for (int i = 0; i < n; i++) {
116 	    String instanceName = "localhost_" + (12918 + i);
117 
118 	    if (i == 0) {
119 		participants[i] = new MockParticipant(clusterName,
120 			instanceName, ZK_ADDR, new ErrTransition(errPartitions));
121 	    } else {
122 		participants[i] = new MockParticipant(clusterName,
123 			instanceName, ZK_ADDR);
124 	    }
125 	    participants[i].syncStart();
126 	}
127 
128 	// verify cluster
129 	Map<String, Map<String, String>> errStateMap = new HashMap<String, Map<String, String>>();
130 	errStateMap.put("TestDB0", new HashMap<String, String>());
131 	errStateMap.get("TestDB0").put("TestDB0_4", "localhost_12918");
132 	errStateMap.get("TestDB0").put("TestDB0_8", "localhost_12918");
133 	boolean result = ClusterStateVerifier
134 		.verifyByZkCallback((new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
135 			ZK_ADDR, clusterName, errStateMap)));
136 	Assert.assertTrue(result, "Cluster verification fails");
137 
138 	// reset a non-exist partition, should throw exception
139 	String hostName = "localhost_12918";
140 	String instanceUrl = getInstanceUrl(clusterName, hostName);
141 
142 	Map<String, String> paramMap = new HashMap<String, String>();
143 	paramMap.put(JsonParameters.MANAGEMENT_COMMAND,
144 		ClusterSetup.resetPartition);
145 	paramMap.put(JsonParameters.PARTITION, "TestDB0_nonExist");
146 	paramMap.put(JsonParameters.RESOURCE, "TestDB0");
147 	LOG.info("IGNORABLE exception: test reset non-exist partition");
148 	TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
149 		paramMap, true);
150 
151 	// reset 2 error partitions
152 	errPartitions.clear();
153 	participants[0].setTransition(new ErrTransitionWithResetCnt(
154 		errPartitions));
155 	clearStatusUpdate(clusterName, "localhost_12918", "TestDB0",
156 		"TestDB0_4");
157 	_errToOfflineInvoked.set(0);
158 
159 	paramMap.put(JsonParameters.PARTITION, "TestDB0_4 TestDB0_8");
160 	TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
161 		paramMap, false);
162 
163 	for (int i = 0; i < 10; i++) {
164 	    Thread.sleep(400); // wait reset to be done
165 	    LOG.info("IGNORABLE exception: test reset non-error partition");
166 	    TestHelixAdminScenariosRest.assertSuccessPostOperation(instanceUrl,
167 		    paramMap, true);
168 
169 	    result = ClusterStateVerifier
170 		    .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(
171 			    ZK_ADDR, clusterName));
172 	    if (result == true) {
173 		break;
174 	    }
175 	}
176 	
177 	Assert.assertTrue(result);
178 	Assert.assertEquals(_errToOfflineInvoked.get(), 2,
179 		"reset() should be invoked 2 times");
180 
181 	// clean up
182 	// wait for all zk callbacks done
183 	Thread.sleep(1000);
184 	// adminThread.stop();
185 	controller.syncStop();
186 	for (int i = 0; i < 5; i++) {
187 	    participants[i].syncStop();
188 	}
189 
190 	System.out.println("END " + clusterName + " at "
191 		+ new Date(System.currentTimeMillis()));
192     }
193 
194     private void clearStatusUpdate(String clusterName, String instance,
195 	    String resource, String partition) {
196 	// clear status update for error partition so verify() will not fail on
197 	// old
198 	// errors
199 	ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName,
200 		new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
201 	Builder keyBuilder = accessor.keyBuilder();
202 
203 	LiveInstance liveInstance = accessor.getProperty(keyBuilder
204 		.liveInstance(instance));
205 	accessor.removeProperty(keyBuilder.stateTransitionStatus(instance,
206 		liveInstance.getSessionId(), resource, partition));
207 
208     }
209 
210     // TODO: throw exception in reset()
211 }