View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.HashMap;
23  import java.util.Map;
24  
25  import org.apache.log4j.Logger;
26  
27  public class ZNRecordBucketizer
28  {
29    private static Logger LOG = Logger.getLogger(ZNRecordBucketizer.class);
30    final int             _bucketSize;
31  
32    public ZNRecordBucketizer(int bucketSize)
33    {
34      if (bucketSize <= 0)
35      {
36        LOG.debug("bucketSize <= 0 (was " + bucketSize
37            + "). Set to 0 to use non-bucketized HelixProperty.");
38        bucketSize = 0;
39      }
40  
41      _bucketSize = bucketSize;
42    }
43  
44    /**
45     * Calculate bucketName in form of "resourceName_p{startPartition}-p{endPartition}
46     * 
47     * @param partitionName
48     * @return
49     */
50    public String getBucketName(String key)
51    {
52      if (_bucketSize == 0)
53      {
54        // no bucketize
55        return null;
56      }
57  
58      int idx = key.lastIndexOf('_');
59      if (idx < 0)
60      {
61        throw new IllegalArgumentException("Could NOT find partition# in " + key
62            + ". partitionName should be in format of resourceName_partition#");
63      }
64  
65      try
66      {
67        int partitionNb = Integer.parseInt(key.substring(idx + 1));
68        int bucketNb = partitionNb / _bucketSize;
69        int startPartition = bucketNb * _bucketSize;
70        int endPartition = bucketNb * _bucketSize + (_bucketSize - 1);
71        return key.substring(0, idx) + "_p" + startPartition + "-p" + endPartition;
72      }
73      catch (NumberFormatException e)
74      {
75        throw new IllegalArgumentException("Could NOT parse partition# ("
76            + key.substring(idx + 1) + ") in " + key);
77      }
78    }
79  
80    public Map<String, ZNRecord> bucketize(ZNRecord record)
81    {
82      Map<String, ZNRecord> map = new HashMap<String, ZNRecord>();
83      if (_bucketSize == 0)
84      {
85        map.put(record.getId(), record);
86        return map;
87      }
88      
89      // bucketize list field
90      for (String partitionName : record.getListFields().keySet())
91      {
92        String bucketName = getBucketName(partitionName);
93        if (bucketName != null)
94        {
95          if (!map.containsKey(bucketName))
96          {
97            map.put(bucketName, new ZNRecord(bucketName));
98          }
99          ZNRecord bucketizedRecord = map.get(bucketName);
100         bucketizedRecord.setListField(partitionName, record.getListField(partitionName));
101       }
102       else
103       {
104         LOG.error("Can't bucketize " + partitionName + " in list field");
105       }
106     }
107 
108     // bucketize map field
109     for (String partitionName : record.getMapFields().keySet())
110     {
111       String bucketName = getBucketName(partitionName);
112       if (bucketName != null)
113       {
114         if (!map.containsKey(bucketName))
115         {
116           map.put(bucketName, new ZNRecord(bucketName));
117         }
118         ZNRecord bucketizedRecord = map.get(bucketName);
119         bucketizedRecord.setMapField(partitionName, record.getMapField(partitionName));
120       }
121       else
122       {
123         LOG.error("Can't bucketize " + partitionName + " in map field");
124       }
125     }
126     
127     // copy all simple fields
128     for (ZNRecord bucketizedRecord : map.values())
129     {
130       bucketizedRecord.setSimpleFields(record.getSimpleFields());
131     }
132     return map;
133   }
134 }