View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  
29  import org.I0Itec.zkclient.exception.ZkMarshallingError;
30  import org.I0Itec.zkclient.serialize.ZkSerializer;
31  import org.apache.helix.HelixException;
32  import org.apache.helix.ZNRecord;
33  import org.apache.log4j.Logger;
34  import org.codehaus.jackson.JsonFactory;
35  import org.codehaus.jackson.JsonGenerator;
36  import org.codehaus.jackson.JsonParser;
37  import org.codehaus.jackson.JsonToken;
38  
39  
40  public class ZNRecordStreamingSerializer implements ZkSerializer
41  {
42    private static Logger LOG = Logger.getLogger(ZNRecordStreamingSerializer.class);
43  
44    private static int getListFieldBound(ZNRecord record)
45    {
46      int max = Integer.MAX_VALUE;
47      if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
48      {
49        String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
50        try
51        {
52          max = Integer.parseInt(maxStr);
53        }
54        catch (Exception e)
55        {
56          LOG.error("IllegalNumberFormat for list field bound: " + maxStr);
57        }
58      }
59      return max;
60    }
61    
62    @Override
63    public byte[] serialize(Object data) throws ZkMarshallingError
64    {
65      if (!(data instanceof ZNRecord))
66      {
67        // null is NOT an instance of any class
68        LOG.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
69        throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
70      }
71  
72      // apply retention policy on list field
73      ZNRecord record = (ZNRecord) data;
74      int max = getListFieldBound(record);
75      if (max < Integer.MAX_VALUE)
76      {
77        Map<String, List<String>> listMap = record.getListFields();
78        for (String key : listMap.keySet())
79        {
80          List<String> list = listMap.get(key);
81          if (list.size() > max)
82          {
83            listMap.put(key, list.subList(0, max));
84          }
85        }
86      }
87      
88      StringWriter sw = new StringWriter();
89      try
90      {
91        JsonFactory f = new JsonFactory();
92        JsonGenerator g = f.createJsonGenerator(sw);
93  
94        g.writeStartObject();
95  
96        // write id field
97        g.writeRaw("\n  ");
98        g.writeStringField("id", record.getId());
99  
100       // write simepleFields
101       g.writeRaw("\n  ");
102       g.writeObjectFieldStart("simpleFields");
103       for (String key : record.getSimpleFields().keySet())
104       {
105         g.writeRaw("\n    ");
106         g.writeStringField(key, record.getSimpleField(key));
107       }
108       g.writeRaw("\n  ");
109       g.writeEndObject(); // for simpleFields
110 
111       // write listFields
112       g.writeRaw("\n  ");
113       g.writeObjectFieldStart("listFields");
114       for (String key : record.getListFields().keySet())
115       {
116         // g.writeStringField(key, record.getListField(key).toString());
117 
118         // g.writeObjectFieldStart(key);
119         g.writeRaw("\n    ");
120         g.writeArrayFieldStart(key);
121         List<String> list = record.getListField(key);
122         for (String listValue : list)
123         {
124           g.writeString(listValue);
125         }
126         // g.writeEndObject();
127         g.writeEndArray();
128 
129       }
130       g.writeRaw("\n  ");
131       g.writeEndObject(); // for listFields
132 
133       // write mapFields
134       g.writeRaw("\n  ");
135       g.writeObjectFieldStart("mapFields");
136       for (String key : record.getMapFields().keySet())
137       {
138         // g.writeStringField(key, record.getMapField(key).toString());
139         g.writeRaw("\n    ");
140         g.writeObjectFieldStart(key);
141         Map<String, String> map = record.getMapField(key);
142         for (String mapKey : map.keySet())
143         {
144           g.writeRaw("\n      ");
145           g.writeStringField(mapKey, map.get(mapKey));
146         }
147         g.writeRaw("\n    ");
148         g.writeEndObject();
149 
150       }
151       g.writeRaw("\n  ");
152       g.writeEndObject(); // for mapFields
153 
154       g.writeRaw("\n");
155       g.writeEndObject(); // for whole znrecord
156 
157       // important: will force flushing of output, close underlying output
158       // stream
159       g.close();
160     }
161     catch (Exception e)
162     {
163       LOG.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
164           + sw.toString().substring(0, 1024), e);
165       throw new HelixException(e);
166     }
167     
168     // check size
169     if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
170     {
171       LOG.error("Data size larger than 1M, ZNRecord.id: " + record.getId() 
172           + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
173       throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
174     }
175     
176     return sw.toString().getBytes();
177   }
178 
179   @Override
180   public Object deserialize(byte[] bytes) throws ZkMarshallingError
181   {
182     if (bytes == null || bytes.length == 0)
183     {
184       LOG.error("ZNode is empty.");
185       return null;
186     }
187     
188     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
189     ZNRecord record = null;
190 
191     try
192     {
193       JsonFactory f = new JsonFactory();
194       JsonParser jp = f.createJsonParser(bais);
195 
196       jp.nextToken(); // will return JsonToken.START_OBJECT (verify?)
197       while (jp.nextToken() != JsonToken.END_OBJECT)
198       {
199         String fieldname = jp.getCurrentName();
200         jp.nextToken(); // move to value, or START_OBJECT/START_ARRAY
201         if ("id".equals(fieldname))
202         { 
203           // contains an object
204           record = new ZNRecord(jp.getText());
205         }
206         else if ("simpleFields".equals(fieldname))
207         {
208           while (jp.nextToken() != JsonToken.END_OBJECT)
209           {
210             String key = jp.getCurrentName();
211             jp.nextToken(); // move to value
212             record.setSimpleField(key, jp.getText());
213           }
214         }
215         else if ("mapFields".equals(fieldname))
216         {
217           // user.setVerified(jp.getCurrentToken() == JsonToken.VALUE_TRUE);
218           while (jp.nextToken() != JsonToken.END_OBJECT)
219           {
220             String key = jp.getCurrentName();
221             record.setMapField(key, new TreeMap<String, String>());
222             jp.nextToken(); // move to value
223 
224             while (jp.nextToken() != JsonToken.END_OBJECT)
225             {
226               String mapKey = jp.getCurrentName();
227               jp.nextToken(); // move to value
228               record.getMapField(key).put(mapKey, jp.getText());
229             }
230           }
231 
232         }
233         else if ("listFields".equals(fieldname))
234         {
235           // user.setUserImage(jp.getBinaryValue());
236           while (jp.nextToken() != JsonToken.END_OBJECT)
237           {
238             String key = jp.getCurrentName();
239             record.setListField(key, new ArrayList<String>());
240             jp.nextToken(); // move to value
241             while (jp.nextToken() != JsonToken.END_ARRAY)
242             {
243               record.getListField(key).add(jp.getText());
244             }
245 
246           }
247 
248         }
249         else
250         {
251           throw new IllegalStateException("Unrecognized field '" + fieldname + "'!");
252         }
253       }
254       jp.close(); // ensure resources get cleaned up timely and properly
255     }
256     catch (Exception e)
257     {
258       LOG.error("Exception during deserialization of bytes: " + new String(bytes), e);
259     }
260 
261     return record;
262   }
263 
264   public static void main(String[] args)
265   {
266     ZNRecord record = new ZNRecord("record");
267     final int recordSize = 10;
268     for (int i = 0; i < recordSize; i++)
269     {
270       record.setSimpleField("" + i, "" + i);
271       record.setListField("" + i, new ArrayList<String>());
272       for (int j = 0; j < recordSize; j++)
273       {
274         record.getListField("" + i).add("" + j);
275       }
276 
277       record.setMapField("" + i, new TreeMap<String, String>());
278       for (int j = 0; j < recordSize; j++)
279       {
280         record.getMapField("" + i).put("" + j, "" + j);
281       }
282     }
283 
284     ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
285     byte[] bytes = serializer.serialize(record);
286     System.out.println(new String(bytes));
287     ZNRecord record2 = (ZNRecord) serializer.deserialize(bytes);
288     System.out.println(record2);
289 
290     long start = System.currentTimeMillis();
291     for (int i = 0; i < 100; i++)
292     {
293       bytes = serializer.serialize(record);
294       // System.out.println(new String(bytes));
295       record2 = (ZNRecord) serializer.deserialize(bytes);
296       // System.out.println(record2);
297     }
298     long end = System.currentTimeMillis();
299     System.out.println("ZNRecordStreamingSerializer time used: " + (end - start));
300 
301     ZNRecordSerializer serializer2 = new ZNRecordSerializer();
302     bytes = serializer2.serialize(record);
303     // System.out.println(new String(bytes));
304     record2 = (ZNRecord) serializer2.deserialize(bytes);
305     // System.out.println(record2);
306 
307     start = System.currentTimeMillis();
308     for (int i = 0; i < 100; i++)
309     {
310       bytes = serializer2.serialize(record);
311       // System.out.println(new String(bytes));
312       record2 = (ZNRecord) serializer2.deserialize(bytes);
313       // System.out.println(record2);
314     }
315     end = System.currentTimeMillis();
316     System.out.println("ZNRecordSerializer time used: " + (end - start));
317 
318   }
319 }