1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.ByteArrayInputStream;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28
29 import org.I0Itec.zkclient.exception.ZkMarshallingError;
30 import org.I0Itec.zkclient.serialize.ZkSerializer;
31 import org.apache.helix.HelixException;
32 import org.apache.helix.ZNRecord;
33 import org.apache.log4j.Logger;
34 import org.codehaus.jackson.JsonFactory;
35 import org.codehaus.jackson.JsonGenerator;
36 import org.codehaus.jackson.JsonParser;
37 import org.codehaus.jackson.JsonToken;
38
39
40 public class ZNRecordStreamingSerializer implements ZkSerializer
41 {
42 private static Logger LOG = Logger.getLogger(ZNRecordStreamingSerializer.class);
43
44 private static int getListFieldBound(ZNRecord record)
45 {
46 int max = Integer.MAX_VALUE;
47 if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
48 {
49 String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
50 try
51 {
52 max = Integer.parseInt(maxStr);
53 }
54 catch (Exception e)
55 {
56 LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
57 }
58 }
59 return max;
60 }
61
62 @Override
63 public byte[] serialize(Object data) throws ZkMarshallingError
64 {
65 if (!(data instanceof ZNRecord))
66 {
67
68 LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
69 throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
70 }
71
72
73 ZNRecord record = (ZNRecord) data;
74 int max = getListFieldBound(record);
75 if (max < Integer.MAX_VALUE)
76 {
77 Map<String, List<String>> listMap = record.getListFields();
78 for (String key : listMap.keySet())
79 {
80 List<String> list = listMap.get(key);
81 if (list.size() > max)
82 {
83 listMap.put(key, list.subList(0, max));
84 }
85 }
86 }
87
88 StringWriter sw = new StringWriter();
89 try
90 {
91 JsonFactory f = new JsonFactory();
92 JsonGenerator g = f.createJsonGenerator(sw);
93
94 g.writeStartObject();
95
96
97 g.writeRaw("\n ");
98 g.writeStringField("id", record.getId());
99
100
101 g.writeRaw("\n ");
102 g.writeObjectFieldStart("simpleFields");
103 for (String key : record.getSimpleFields().keySet())
104 {
105 g.writeRaw("\n ");
106 g.writeStringField(key, record.getSimpleField(key));
107 }
108 g.writeRaw("\n ");
109 g.writeEndObject();
110
111
112 g.writeRaw("\n ");
113 g.writeObjectFieldStart("listFields");
114 for (String key : record.getListFields().keySet())
115 {
116
117
118
119 g.writeRaw("\n ");
120 g.writeArrayFieldStart(key);
121 List<String> list = record.getListField(key);
122 for (String listValue : list)
123 {
124 g.writeString(listValue);
125 }
126
127 g.writeEndArray();
128
129 }
130 g.writeRaw("\n ");
131 g.writeEndObject();
132
133
134 g.writeRaw("\n ");
135 g.writeObjectFieldStart("mapFields");
136 for (String key : record.getMapFields().keySet())
137 {
138
139 g.writeRaw("\n ");
140 g.writeObjectFieldStart(key);
141 Map<String, String> map = record.getMapField(key);
142 for (String mapKey : map.keySet())
143 {
144 g.writeRaw("\n ");
145 g.writeStringField(mapKey, map.get(mapKey));
146 }
147 g.writeRaw("\n ");
148 g.writeEndObject();
149
150 }
151 g.writeRaw("\n ");
152 g.writeEndObject();
153
154 g.writeRaw("\n");
155 g.writeEndObject();
156
157
158
159 g.close();
160 }
161 catch (Exception e)
162 {
163 LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
164 + sw.toString().substring(0, 1024), e);
165 throw new HelixException(e);
166 }
167
168
169 if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
170 {
171 LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId()
172 + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
173 throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
174 }
175
176 return sw.toString().getBytes();
177 }
178
179 @Override
180 public Object deserialize(byte[] bytes) throws ZkMarshallingError
181 {
182 if (bytes == null || bytes.length == 0)
183 {
184 LOG.error("ZNode is empty.");
185 return null;
186 }
187
188 ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
189 ZNRecord record = null;
190
191 try
192 {
193 JsonFactory f = new JsonFactory();
194 JsonParser jp = f.createJsonParser(bais);
195
196 jp.nextToken();
197 while (jp.nextToken() != JsonToken.END_OBJECT)
198 {
199 String fieldname = jp.getCurrentName();
200 jp.nextToken();
201 if ("id".equals(fieldname))
202 {
203
204 record = new ZNRecord(jp.getText());
205 }
206 else if ("simpleFields".equals(fieldname))
207 {
208 while (jp.nextToken() != JsonToken.END_OBJECT)
209 {
210 String key = jp.getCurrentName();
211 jp.nextToken();
212 record.setSimpleField(key, jp.getText());
213 }
214 }
215 else if ("mapFields".equals(fieldname))
216 {
217
218 while (jp.nextToken() != JsonToken.END_OBJECT)
219 {
220 String key = jp.getCurrentName();
221 record.setMapField(key, new TreeMap<String, String>());
222 jp.nextToken();
223
224 while (jp.nextToken() != JsonToken.END_OBJECT)
225 {
226 String mapKey = jp.getCurrentName();
227 jp.nextToken();
228 record.getMapField(key).put(mapKey, jp.getText());
229 }
230 }
231
232 }
233 else if ("listFields".equals(fieldname))
234 {
235
236 while (jp.nextToken() != JsonToken.END_OBJECT)
237 {
238 String key = jp.getCurrentName();
239 record.setListField(key, new ArrayList<String>());
240 jp.nextToken();
241 while (jp.nextToken() != JsonToken.END_ARRAY)
242 {
243 record.getListField(key).add(jp.getText());
244 }
245
246 }
247
248 }
249 else
250 {
251 throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
252 }
253 }
254 jp.close();
255 }
256 catch (Exception e)
257 {
258 LOG.error("Exception during deserialization of bytes: " + new String(bytes), e);
259 }
260
261 return record;
262 }
263
264 public static void main(String[] args)
265 {
266 ZNRecord record = new ZNRecord("record");
267 final int recordSize = 10;
268 for (int i = 0; i < recordSize; i++)
269 {
270 record.setSimpleField("" + i, "" + i);
271 record.setListField("" + i, new ArrayList<String>());
272 for (int j = 0; j < recordSize; j++)
273 {
274 record.getListField("" + i).add("" + j);
275 }
276
277 record.setMapField("" + i, new TreeMap<String, String>());
278 for (int j = 0; j < recordSize; j++)
279 {
280 record.getMapField("" + i).put("" + j, "" + j);
281 }
282 }
283
284 ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
285 byte[] bytes = serializer.serialize(record);
286 System.out.println(new String(bytes));
287 ZNRecord record2 = (ZNRecord) serializer.deserialize(bytes);
288 System.out.println(record2);
289
290 long start = System.currentTimeMillis();
291 for (int i = 0; i < 100; i++)
292 {
293 bytes = serializer.serialize(record);
294
295 record2 = (ZNRecord) serializer.deserialize(bytes);
296
297 }
298 long end = System.currentTimeMillis();
299 System.out.println("ZNRecordStreamingSerializer time used: " + (end - start));
300
301 ZNRecordSerializer serializer2 = new ZNRecordSerializer();
302 bytes = serializer2.serialize(record);
303
304 record2 = (ZNRecord) serializer2.deserialize(bytes);
305
306
307 start = System.currentTimeMillis();
308 for (int i = 0; i < 100; i++)
309 {
310 bytes = serializer2.serialize(record);
311
312 record2 = (ZNRecord) serializer2.deserialize(bytes);
313
314 }
315 end = System.currentTimeMillis();
316 System.out.println("ZNRecordSerializer time used: " + (end - start));
317
318 }
319 }