1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayInputStream;
23 import java.io.StringWriter;
24 import java.util.List;
25 import java.util.Map;
26
27 import org.I0Itec.zkclient.serialize.ZkSerializer;
28 import org.apache.helix.HelixException;
29 import org.apache.helix.ZNRecord;
30 import org.apache.log4j.Logger;
31 import org.codehaus.jackson.map.DeserializationConfig;
32 import org.codehaus.jackson.map.ObjectMapper;
33 import org.codehaus.jackson.map.SerializationConfig;
34
35
36 public class ZNRecordSerializer implements ZkSerializer
37 {
38 private static Logger logger = Logger.getLogger(ZNRecordSerializer.class);
39
40 private static int getListFieldBound(ZNRecord record)
41 {
42 int max = Integer.MAX_VALUE;
43 if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
44 {
45 String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
46 try
47 {
48 max = Integer.parseInt(maxStr);
49 }
50 catch (Exception e)
51 {
52 logger.error("IllegalNumberFormat for list field bound: " + maxStr);
53 }
54 }
55 return max;
56 }
57
58 @Override
59 public byte[] serialize(Object data)
60 {
61 if (!(data instanceof ZNRecord))
62 {
63
64 logger.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
65 throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
66 }
67
68 ZNRecord record = (ZNRecord) data;
69
70
71 int max = getListFieldBound(record);
72 if (max < Integer.MAX_VALUE)
73 {
74 Map<String, List<String>> listMap = record.getListFields();
75 for (String key : listMap.keySet())
76 {
77 List<String> list = listMap.get(key);
78 if (list.size() > max)
79 {
80 listMap.put(key, list.subList(0, max));
81 }
82 }
83 }
84
85
86 ObjectMapper mapper = new ObjectMapper();
87 SerializationConfig serializationConfig = mapper.getSerializationConfig();
88 serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
89 serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
90 serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
91 StringWriter sw = new StringWriter();
92 try
93 {
94 mapper.writeValue(sw, data);
95 } catch (Exception e)
96 {
97 logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
98 + sw.toString().substring(0, 1024), e);
99 throw new HelixException(e);
100 }
101
102 if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
103 {
104 logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
105 + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
106 throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
107 }
108 return sw.toString().getBytes();
109 }
110
111 @Override
112 public Object deserialize(byte[] bytes)
113 {
114 if (bytes == null || bytes.length == 0)
115 {
116
117 return null;
118 }
119
120 ObjectMapper mapper = new ObjectMapper();
121 ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
122
123 DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
124 deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
125 deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
126 deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
127 try
128 {
129 ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
130 return zn;
131 } catch (Exception e)
132 {
133 logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
134 return null;
135 }
136 }
137 }