View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Set;
27  import java.util.TreeMap;
28  import java.util.TreeSet;
29  
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.model.IdealState;
32  import org.apache.helix.tools.ClusterSetup;
33  import org.apache.helix.tools.DefaultIdealStateCalculator;
34  import org.apache.helix.util.RebalanceUtil;
35  import org.testng.Assert;
36  import org.testng.AssertJUnit;
37  import org.testng.annotations.Test;
38  
39  
40  
41  public class TestEspressoStorageClusterIdealState
42  {
43    @Test ()
44    public void testEspressoStorageClusterIdealState() throws Exception
45    {
46      List<String> instanceNames = new ArrayList<String>();
47      for(int i = 0;i < 5; i++)
48      {
49        instanceNames.add("localhost:123" + i);
50      }
51      int partitions = 8, replicas = 0;
52      Map<String, Object> result0 = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
53      Verify(result0, partitions,replicas);
54  
55      partitions = 8192;
56      replicas = 3;
57  
58      instanceNames.clear();
59      for(int i = 0;i < 20; i++)
60      {
61        instanceNames.add("localhost:123" + i);
62      }
63      Map<String, Object> resultOriginal = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
64  
65      Verify(resultOriginal, partitions,replicas);
66      printStat(resultOriginal);
67  
68      Map<String, Object> result1 = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
69  
70      List<String> instanceNames2 = new ArrayList<String>();
71      for(int i = 30;i < 35; i++)
72      {
73        instanceNames2.add("localhost:123" + i);
74      }
75  
76      DefaultIdealStateCalculator.calculateNextIdealState(instanceNames2, result1);
77  
78      List<String> instanceNames3 = new ArrayList<String>();
79      for(int i = 35;i < 40; i++)
80      {
81        instanceNames3.add("localhost:123" + i);
82      }
83  
84      DefaultIdealStateCalculator.calculateNextIdealState(instanceNames3, result1);
85      Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
86      Verify(result1, partitions,replicas);
87      double[] result = compareResult(resultOriginal, result1);
88      masterKeepRatio = result[0];
89      slaveKeepRatio = result[1];
90      Assert.assertTrue(0.66 < masterKeepRatio && 0.67 > masterKeepRatio);
91      Assert.assertTrue(0.66 < slaveKeepRatio && 0.67 > slaveKeepRatio);
92  
93    }
94    
95    @Test
96    public void testRebalance2()
97    {
98      int partitions = 1256, replicas = 3;
99      List<String> instanceNames = new ArrayList<String>();
100     
101     for(int i = 0;i < 10; i++)
102     {
103       instanceNames.add("localhost:123" + i);
104     }
105     
106     Map<String, Object> resultOriginal = DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
107     
108     ZNRecord idealState1 = DefaultIdealStateCalculator.convertToZNRecord(resultOriginal, "TestDB", "MASTER", "SLAVE");
109     
110     Map<String, Object> result1 = RebalanceUtil.buildInternalIdealState(new IdealState(idealState1));
111     
112     List<String> instanceNames2 = new ArrayList<String>();
113     for(int i = 30;i < 35; i++)
114     {
115       instanceNames2.add("localhost:123" + i);
116     }
117     
118     Map<String, Object> result2 = DefaultIdealStateCalculator.calculateNextIdealState(instanceNames2, result1);
119     
120     Verify(resultOriginal, partitions,replicas);
121     Verify(result2, partitions,replicas);
122     Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
123     double[] result = compareResult(resultOriginal, result2);
124     masterKeepRatio = result[0];
125     slaveKeepRatio = result[1];
126     Assert.assertTrue(0.66 < masterKeepRatio && 0.67 > masterKeepRatio);
127     Assert.assertTrue(0.66 < slaveKeepRatio && 0.67 > slaveKeepRatio);
128   }
129 
130   public static void Verify(Map<String, Object> result, int partitions, int replicas)
131   {
132     Map<String, List<Integer>> masterAssignmentMap = (Map<String, List<Integer>>) (result.get("MasterAssignmentMap"));
133     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap = (Map<String, Map<String, List<Integer>>>)(result.get("SlaveAssignmentMap"));
134 
135     AssertJUnit.assertTrue( partitions == (Integer)(result.get("partitions")));
136 
137     // Verify master partitions covers all master partitions on each node
138     Map<Integer, Integer> masterCounterMap = new TreeMap<Integer, Integer>();
139     for(int i = 0;i<partitions; i++)
140     {
141       masterCounterMap.put(i, 0);
142     }
143 
144     int minMasters = Integer.MAX_VALUE, maxMasters = Integer.MIN_VALUE;
145     for(String instanceName : masterAssignmentMap.keySet())
146     {
147       List<Integer> masterList = masterAssignmentMap.get(instanceName);
148       // the assert needs to be changed when weighting is introduced
149       // AssertJUnit.assertTrue(masterList.size() == partitions /masterAssignmentMap.size() | masterList.size() == (partitions /masterAssignmentMap.size()+1) );
150 
151       for(Integer x : masterList)
152       {
153         AssertJUnit.assertTrue(masterCounterMap.get(x) == 0);
154         masterCounterMap.put(x,1);
155       }
156       if(minMasters > masterList.size())
157       {
158         minMasters = masterList.size();
159       }
160       if(maxMasters < masterList.size())
161       {
162         maxMasters = masterList.size();
163       }
164     }
165     // Master partition should be evenly distributed most of the time
166     System.out.println("Masters: max: "+maxMasters+" Min:"+ minMasters);
167     // Each master partition should occur only once
168     for(int i = 0;i < partitions; i++)
169     {
170       AssertJUnit.assertTrue(masterCounterMap.get(i) == 1);
171     }
172     AssertJUnit.assertTrue(masterCounterMap.size() == partitions);
173 
174     // for each node, verify the master partitions and the slave partition assignment map
175     if(replicas == 0)
176     {
177       AssertJUnit.assertTrue(nodeSlaveAssignmentMap.size() == 0);
178       return;
179     }
180 
181     AssertJUnit.assertTrue(masterAssignmentMap.size() == nodeSlaveAssignmentMap.size());
182     for(String instanceName: masterAssignmentMap.keySet())
183     {
184       AssertJUnit.assertTrue(nodeSlaveAssignmentMap.containsKey(instanceName));
185 
186       Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
187       Map<Integer, Integer> slaveCountMap = new TreeMap<Integer, Integer>();
188       List<Integer> masterList = masterAssignmentMap.get(instanceName);
189 
190       for(Integer masterPartitionId : masterList)
191       {
192         slaveCountMap.put(masterPartitionId, 0);
193       }
194       // Make sure that masterList are covered replica times by the slave assignment.
195       int minSlaves = Integer.MAX_VALUE, maxSlaves = Integer.MIN_VALUE;
196       for(String hostInstance : slaveAssignmentMap.keySet())
197       {
198         List<Integer> slaveAssignment = slaveAssignmentMap.get(hostInstance);
199         Set<Integer> occurenceSet = new HashSet<Integer>();
200 
201         // Each slave should occur only once in the list, since the list is per-node slaves
202         for(Integer slavePartition : slaveAssignment)
203         {
204           AssertJUnit.assertTrue(!occurenceSet.contains(slavePartition));
205           occurenceSet.add(slavePartition);
206 
207           slaveCountMap.put(slavePartition, slaveCountMap.get(slavePartition) + 1);
208         }
209         if(minSlaves > slaveAssignment.size())
210         {
211           minSlaves = slaveAssignment.size();
212         }
213         if(maxSlaves < slaveAssignment.size())
214         {
215           maxSlaves = slaveAssignment.size();
216         }
217       }
218       // check if slave distribution is even
219       AssertJUnit.assertTrue(maxSlaves - minSlaves <= 1);
220       // System.out.println("Slaves: max: "+maxSlaves+" Min:"+ minSlaves);
221 
222       // for each node, the slave assignment map should cover the masters for exactly replica
223       // times
224       AssertJUnit.assertTrue(slaveCountMap.size() == masterList.size());
225       for(Integer masterPartitionId : masterList)
226       {
227         AssertJUnit.assertTrue(slaveCountMap.get(masterPartitionId) == replicas);
228       }
229     }
230 
231   }
232 
233   public void printStat(Map<String, Object> result)
234   {
235     // print out master distribution
236 
237     // print out slave distribution
238 
239   }
240 
241   public static double [] compareResult(Map<String, Object> result1, Map<String, Object> result2)
242   {
243     double [] result = new double[2];
244     Map<String, List<Integer>> masterAssignmentMap1 = (Map<String, List<Integer>>) (result1.get("MasterAssignmentMap"));
245     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap1 = (Map<String, Map<String, List<Integer>>>)(result1.get("SlaveAssignmentMap"));
246 
247     Map<String, List<Integer>> masterAssignmentMap2 = (Map<String, List<Integer>>) (result2.get("MasterAssignmentMap"));
248     Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap2 = (Map<String, Map<String, List<Integer>>>)(result2.get("SlaveAssignmentMap"));
249 
250     int commonMasters = 0;
251     int commonSlaves = 0;
252     int partitions = (Integer)(result1.get("partitions"));
253     int replicas = (Integer)(result1.get("replicas"));
254 
255     AssertJUnit.assertTrue((Integer)(result2.get("partitions")) == partitions);
256     AssertJUnit.assertTrue((Integer)(result2.get("replicas")) == replicas);
257 
258     // masterMap1 maps from partition id to the holder instance name
259     Map<Integer, String> masterMap1 = new TreeMap<Integer, String>();
260     for(String instanceName : masterAssignmentMap1.keySet())
261     {
262       List<Integer> masterList1 = masterAssignmentMap1.get(instanceName);
263       for(Integer partition : masterList1)
264       {
265         AssertJUnit.assertTrue(!masterMap1.containsKey(partition));
266         masterMap1.put(partition, instanceName);
267       }
268     }
269     // go through masterAssignmentMap2 and find out the common number
270     for(String instanceName : masterAssignmentMap2.keySet())
271     {
272       List<Integer> masterList2 = masterAssignmentMap2.get(instanceName);
273       for(Integer partition : masterList2)
274       {
275         if(masterMap1.get(partition).equalsIgnoreCase(instanceName))
276         {
277           commonMasters ++;
278         }
279       }
280     }
281     
282     result[0] = 1.0*commonMasters/partitions;
283     System.out.println(commonMasters + " master partitions are kept, "+ (partitions - commonMasters) + " moved, keep ratio:" + 1.0*commonMasters/partitions);
284     
285     // maps from the partition id to the instance names that holds its slave partition
286     Map<Integer, Set<String>> slaveMap1 = new TreeMap<Integer, Set<String>>();
287     for(String instanceName : nodeSlaveAssignmentMap1.keySet())
288     {
289       Map<String, List<Integer>> slaveAssignment1 = nodeSlaveAssignmentMap1.get(instanceName);
290       for(String slaveHostName : slaveAssignment1.keySet())
291       {
292         List<Integer> slaveList = slaveAssignment1.get(slaveHostName);
293         for(Integer partition : slaveList)
294         {
295           if(!slaveMap1.containsKey(partition))
296           {
297             slaveMap1.put(partition, new TreeSet<String>());
298           }
299           AssertJUnit.assertTrue(!slaveMap1.get(partition).contains(slaveHostName));
300           slaveMap1.get(partition).add(slaveHostName);
301         }
302       }
303     }
304 
305     for(String instanceName : nodeSlaveAssignmentMap2.keySet())
306     {
307       Map<String, List<Integer>> slaveAssignment2 = nodeSlaveAssignmentMap2.get(instanceName);
308       for(String slaveHostName : slaveAssignment2.keySet())
309       {
310         List<Integer> slaveList = slaveAssignment2.get(slaveHostName);
311         for(Integer partition : slaveList)
312         {
313           if(slaveMap1.get(partition).contains(slaveHostName))
314           {
315             commonSlaves++;
316           }
317         }
318       }
319     }
320     result[1] = 1.0*commonSlaves/partitions/replicas;
321     System.out.println(commonSlaves + " slave partitions are kept, " + (partitions * replicas - commonSlaves)+ " moved. keep ratio:"+1.0*commonSlaves/partitions/replicas);
322     return result;
323   }
324 
325 }