1 package org.apache.helix;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.HashMap;
23 import java.util.Map;
24
25 import org.apache.log4j.Logger;
26
27 public class ZNRecordBucketizer
28 {
29 private static Logger LOG = Logger.getLogger(ZNRecordBucketizer.class);
30 final int _bucketSize;
31
32 public ZNRecordBucketizer(int bucketSize)
33 {
34 if (bucketSize <= 0)
35 {
36 LOG.debug("bucketSize <= 0 (was " + bucketSize
37 + "). Set to 0 to use non-bucketized HelixProperty.");
38 bucketSize = 0;
39 }
40
41 _bucketSize = bucketSize;
42 }
43
44
45
46
47
48
49
50 public String getBucketName(String key)
51 {
52 if (_bucketSize == 0)
53 {
54
55 return null;
56 }
57
58 int idx = key.lastIndexOf('_');
59 if (idx < 0)
60 {
61 throw new IllegalArgumentException("Could NOT find partition# in " + key
62 + ". partitionName should be in format of resourceName_partition#");
63 }
64
65 try
66 {
67 int partitionNb = Integer.parseInt(key.substring(idx + 1));
68 int bucketNb = partitionNb / _bucketSize;
69 int startPartition = bucketNb * _bucketSize;
70 int endPartition = bucketNb * _bucketSize + (_bucketSize - 1);
71 return key.substring(0, idx) + "_p" + startPartition + "-p" + endPartition;
72 }
73 catch (NumberFormatException e)
74 {
75 throw new IllegalArgumentException("Could NOT parse partition# ("
76 + key.substring(idx + 1) + ") in " + key);
77 }
78 }
79
80 public Map<String, ZNRecord> bucketize(ZNRecord record)
81 {
82 Map<String, ZNRecord> map = new HashMap<String, ZNRecord>();
83 if (_bucketSize == 0)
84 {
85 map.put(record.getId(), record);
86 return map;
87 }
88
89
90 for (String partitionName : record.getListFields().keySet())
91 {
92 String bucketName = getBucketName(partitionName);
93 if (bucketName != null)
94 {
95 if (!map.containsKey(bucketName))
96 {
97 map.put(bucketName, new ZNRecord(bucketName));
98 }
99 ZNRecord bucketizedRecord = map.get(bucketName);
100 bucketizedRecord.setListField(partitionName, record.getListField(partitionName));
101 }
102 else
103 {
104 LOG.error("Can't bucketize " + partitionName + " in list field");
105 }
106 }
107
108
109 for (String partitionName : record.getMapFields().keySet())
110 {
111 String bucketName = getBucketName(partitionName);
112 if (bucketName != null)
113 {
114 if (!map.containsKey(bucketName))
115 {
116 map.put(bucketName, new ZNRecord(bucketName));
117 }
118 ZNRecord bucketizedRecord = map.get(bucketName);
119 bucketizedRecord.setMapField(partitionName, record.getMapField(partitionName));
120 }
121 else
122 {
123 LOG.error("Can't bucketize " + partitionName + " in map field");
124 }
125 }
126
127
128 for (ZNRecord bucketizedRecord : map.values())
129 {
130 bucketizedRecord.setSimpleFields(record.getSimpleFields());
131 }
132 return map;
133 }
134 }