View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.Map;
26  import java.util.Set;
27  
28  import org.apache.helix.HelixDataAccessor;
29  import org.apache.helix.TestHelper;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.TestHelper.StartCMResult;
33  import org.apache.helix.controller.HelixControllerMain;
34  import org.apache.helix.controller.stages.ClusterDataCache;
35  import org.apache.helix.integration.TestAutoRebalancePartitionLimit.ExternalViewBalancedVerifier;
36  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
37  import org.apache.helix.manager.zk.ZNRecordSerializer;
38  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
39  import org.apache.helix.manager.zk.ZkClient;
40  import org.apache.helix.model.ExternalView;
41  import org.apache.helix.model.IdealState.IdealStateModeProperty;
42  import org.apache.helix.tools.ClusterSetup;
43  import org.apache.helix.tools.ClusterStateVerifier;
44  import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
45  import org.apache.log4j.Logger;
46  import org.testng.Assert;
47  import org.testng.annotations.BeforeClass;
48  import org.testng.annotations.Test;
49  
50  
51  public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerCheck
52  {
53    private static final Logger LOG = Logger.getLogger(TestAutoRebalance.class.getName());
54    String db2 = TEST_DB+"2";
55    String _tag = "SSDSSD";
56    @BeforeClass
57    public void beforeClass() throws Exception
58    {
59      // Logger.getRootLogger().setLevel(Level.INFO);
60      System.out.println("START " + CLASS_NAME + " at "
61          + new Date(System.currentTimeMillis()));
62  
63      _zkClient = new ZkClient(ZK_ADDR);
64      _zkClient.setZkSerializer(new ZNRecordSerializer());
65      String namespace = "/" + CLUSTER_NAME;
66      if (_zkClient.exists(namespace))
67      {
68        _zkClient.deleteRecursive(namespace);
69      }
70      _setupTool = new ClusterSetup(ZK_ADDR);
71  
72      // setup storage cluster
73      _setupTool.addCluster(CLUSTER_NAME, true);
74      _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS, STATE_MODEL, IdealStateModeProperty.AUTO_REBALANCE+"");
75      
76      _setupTool.addResourceToCluster(CLUSTER_NAME, db2, _PARTITIONS, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"");
77      
78      
79      for (int i = 0; i < NODE_NR; i++)
80      {
81        String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
82        _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
83      }
84      
85      _setupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
86      
87      for (int i = 0; i < 3; i++)
88      {
89        String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
90        _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, _tag);
91      }
92  
93      _setupTool.rebalanceCluster(CLUSTER_NAME, db2, 1, "ucpx",_tag);
94      
95      // start dummy participants
96      for (int i = 0; i < NODE_NR; i++)
97      {
98        String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
99        if (_startCMResultMap.get(instanceName) != null)
100       {
101         LOG.error("fail to start particpant:" + instanceName
102             + "(participant with same name already exists)");
103       }
104       else
105       {
106         StartCMResult result =
107             TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName);
108         _startCMResultMap.put(instanceName, result);
109       }
110     }
111 
112     // start controller
113     String controllerName = CONTROLLER_PREFIX + "_0";
114     StartCMResult startResult =
115         TestHelper.startController(CLUSTER_NAME,
116                                    controllerName,
117                                    ZK_ADDR,
118                                    HelixControllerMain.STANDALONE);
119     _startCMResultMap.put(controllerName, startResult);
120 
121     boolean result =
122         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
123                                                                               CLUSTER_NAME, TEST_DB));
124 
125     Assert.assertTrue(result);
126     
127     
128   }
129   
130 
131   @Test()
132   public void testDropResourceAutoRebalance() throws Exception
133   {
134     // add a resource to be dropped
135     _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB", _PARTITIONS, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+"");
136      
137     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB", 1);
138 
139     boolean result =
140             ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
141                     CLUSTER_NAME, "MyDB"));
142     Assert.assertTrue(result);
143 
144     String command =
145         "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB";
146     ClusterSetup.processCommandLineArgs(command.split(" "));
147 
148     TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
149                                  30 * 1000,
150                                  CLUSTER_NAME,
151                                  "MyDB",
152                                  TestHelper.<String> setOf("localhost_12918",
153                                                            "localhost_12919",
154                                                            "localhost_12920",
155                                                            "localhost_12921",
156                                                            "localhost_12922"),
157                                  ZK_ADDR);
158     
159  // add a resource to be dropped
160     _setupTool.addResourceToCluster(CLUSTER_NAME, "MyDB2", _PARTITIONS, "MasterSlave", IdealStateModeProperty.AUTO_REBALANCE+"");
161      
162     _setupTool.rebalanceStorageCluster(CLUSTER_NAME, "MyDB2", 3);
163 
164     result =
165             ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
166                     CLUSTER_NAME, "MyDB2"));
167     Assert.assertTrue(result);
168 
169     command =
170         "-zkSvr " + ZK_ADDR + " -dropResource " + CLUSTER_NAME + " " + "MyDB2";
171     ClusterSetup.processCommandLineArgs(command.split(" "));
172 
173     TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView",
174                                  30 * 1000,
175                                  CLUSTER_NAME,
176                                  "MyDB2",
177                                  TestHelper.<String> setOf("localhost_12918",
178                                                            "localhost_12919",
179                                                            "localhost_12920",
180                                                            "localhost_12921",
181                                                            "localhost_12922"),
182                                  ZK_ADDR);
183   }
184   
185   
186   
187   
188   @Test()
189   public void testAutoRebalance() throws Exception
190   {
191     
192     // kill 1 node
193     String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
194     _startCMResultMap.get(instanceName)._manager.disconnect();
195     Thread.currentThread().sleep(1000);
196     _startCMResultMap.get(instanceName)._thread.interrupt();
197     
198     //verifyBalanceExternalView();
199     boolean result =
200         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
201                                                                               CLUSTER_NAME, TEST_DB));
202     Assert.assertTrue(result);
203     
204     // add 2 nodes
205     for (int i = 0; i < 2; i++)
206     {
207       String storageNodeName = PARTICIPANT_PREFIX + "_" + (1000 + i);
208       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
209       
210       StartCMResult resultx =
211           TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, storageNodeName.replace(':', '_'));
212       _startCMResultMap.put(storageNodeName, resultx);
213     }
214     Thread.sleep(1000);
215     result =
216         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
217                                                                               CLUSTER_NAME, TEST_DB));
218     Assert.assertTrue(result);
219     
220     result =
221         ClusterStateVerifier.verifyByZkCallback(new ExternalViewBalancedVerifier(_zkClient,
222             CLUSTER_NAME, db2));
223     Assert.assertTrue(result);
224     HelixDataAccessor accessor = new ZKHelixDataAccessor( CLUSTER_NAME, new ZkBaseDataAccessor(_zkClient));
225     Builder keyBuilder = accessor.keyBuilder();
226     ExternalView ev = accessor.getProperty(keyBuilder.externalView(db2));
227     Set<String> instancesSet = new HashSet<String>();
228     for(String partitionName : ev.getRecord().getMapFields().keySet())
229     {
230       Map<String, String> assignmentMap = ev.getRecord().getMapField(partitionName);
231       for(String instance : assignmentMap.keySet())
232       {
233         instancesSet.add(instance);
234       }
235     }
236     Assert.assertEquals(instancesSet.size(), 2);
237   }
238   
239   static boolean verifyBalanceExternalView(ZNRecord externalView, int partitionCount, String masterState, int replica, int instances)
240   {
241     if(externalView == null)
242     {
243       return false;
244     }
245     Map<String, Integer> masterPartitionsCountMap = new HashMap<String, Integer>();
246     for(String partitionName : externalView.getMapFields().keySet())
247     {
248       Map<String, String> assignmentMap = externalView.getMapField(partitionName);
249       //Assert.assertTrue(assignmentMap.size() >= replica);
250       for(String instance : assignmentMap.keySet())
251       {
252         if(assignmentMap.get(instance).equals(masterState))
253         {
254           if(!masterPartitionsCountMap.containsKey(instance))
255           {
256             masterPartitionsCountMap.put(instance, 0);
257           }
258           masterPartitionsCountMap.put(instance, masterPartitionsCountMap.get(instance) + 1);
259         }
260       }
261     }
262     
263     int perInstancePartition = partitionCount / instances;
264     
265     int totalCount = 0;
266     for(String instanceName : masterPartitionsCountMap.keySet())
267     {
268       int instancePartitionCount = masterPartitionsCountMap.get(instanceName);
269       totalCount += instancePartitionCount;
270       if(!(instancePartitionCount == perInstancePartition || instancePartitionCount == perInstancePartition +1 ))
271       {
272         return false;
273       }
274       if(instancePartitionCount == perInstancePartition +1)
275       {
276         if(partitionCount % instances == 0)
277         {
278           return false;
279         }
280       }
281     }
282     if(partitionCount != totalCount)
283     {
284       return false;
285     }
286     return true;
287     
288   }
289   
290   public static class ExternalViewBalancedVerifier implements ZkVerifier
291   {
292     ZkClient _client;
293     String _clusterName;
294     String _resourceName;
295     
296     public ExternalViewBalancedVerifier(ZkClient client, String clusterName, String resourceName)
297     {
298       _client = client;
299       _clusterName = clusterName;
300       _resourceName = resourceName;
301     }
302     @Override
303     public boolean verify()
304     {
305       HelixDataAccessor accessor = new ZKHelixDataAccessor( _clusterName, new ZkBaseDataAccessor(_client));
306       Builder keyBuilder = accessor.keyBuilder();
307       int numberOfPartitions = accessor.getProperty(keyBuilder.idealStates(_resourceName)).getRecord().getListFields().size();
308       ClusterDataCache cache = new ClusterDataCache();
309       cache.refresh(accessor);
310       String masterValue = cache.getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefRef()).getStatesPriorityList().get(0);
311       int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas());
312       String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag();
313       int instances = 0;
314       for(String  liveInstanceName : cache.getLiveInstances().keySet())
315       {
316         if(cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag))
317         {
318           instances ++;
319         }
320       }
321       if(instances == 0)
322       {
323         instances = cache.getLiveInstances().size();
324       }
325       ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName));
326       if(ev == null)
327       {
328         return false;
329       }
330       return verifyBalanceExternalView(ev.getRecord(), numberOfPartitions, masterValue, replicas, instances);
331     }
332 
333     @Override
334     public ZkClient getZkClient()
335     {
336       return _client;
337     }
338 
339     @Override
340     public String getClusterName()
341     {
342       return _clusterName;
343     }
344     
345   }
346 }