View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.StringWriter;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.I0Itec.zkclient.serialize.ZkSerializer;
28  import org.apache.helix.HelixException;
29  import org.apache.helix.ZNRecord;
30  import org.apache.log4j.Logger;
31  import org.codehaus.jackson.map.DeserializationConfig;
32  import org.codehaus.jackson.map.ObjectMapper;
33  import org.codehaus.jackson.map.SerializationConfig;
34  
35  
36  public class ZNRecordSerializer implements ZkSerializer
37  {
38    private static Logger logger = Logger.getLogger(ZNRecordSerializer.class);
39  
40    private static int getListFieldBound(ZNRecord record)
41    {
42      int max = Integer.MAX_VALUE;
43      if (record.getSimpleFields().containsKey(ZNRecord.LIST_FIELD_BOUND))
44      {
45        String maxStr = record.getSimpleField(ZNRecord.LIST_FIELD_BOUND);
46        try
47        {
48          max = Integer.parseInt(maxStr);
49        }
50        catch (Exception e)
51        {
52          logger.error("IllegalNumberFormat for list field bound: " + maxStr);
53        }
54      }
55      return max;
56    }
57  
58    @Override
59    public byte[] serialize(Object data)
60    {
61      if (!(data instanceof ZNRecord))
62      {
63        // null is NOT an instance of any class
64        logger.error("Input object must be of type ZNRecord but it is " + data + ". Will not write to zk");
65        throw new HelixException("Input object is not of type ZNRecord (was " + data + ")");
66      }
67  
68      ZNRecord record = (ZNRecord) data;
69      
70      // apply retention policy
71      int max = getListFieldBound(record);
72      if (max < Integer.MAX_VALUE)
73      {
74        Map<String, List<String>> listMap = record.getListFields();
75        for (String key : listMap.keySet())
76        {
77          List<String> list = listMap.get(key);
78          if (list.size() > max)
79          {
80            listMap.put(key, list.subList(0, max));
81          }
82        }
83      }
84  
85      // do serialization
86      ObjectMapper mapper = new ObjectMapper();
87      SerializationConfig serializationConfig = mapper.getSerializationConfig();
88      serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true);
89      serializationConfig.set(SerializationConfig.Feature.AUTO_DETECT_FIELDS, true);
90      serializationConfig.set(SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
91      StringWriter sw = new StringWriter();
92      try
93      {
94        mapper.writeValue(sw, data);
95      } catch (Exception e)
96      {
97        logger.error("Exception during data serialization. Will not write to zk. Data (first 1k): "
98            + sw.toString().substring(0, 1024), e);
99        throw new HelixException(e);
100     }
101     
102     if (sw.toString().getBytes().length > ZNRecord.SIZE_LIMIT)
103     {
104       logger.error("Data size larger than 1M, ZNRecord.id: " + record.getId() 
105           + ". Will not write to zk. Data (first 1k): " + sw.toString().substring(0, 1024));
106       throw new HelixException("Data size larger than 1M, ZNRecord.id: " + record.getId());
107     }
108     return sw.toString().getBytes();
109   }
110 
111   @Override
112   public Object deserialize(byte[] bytes)
113   {
114     if (bytes == null || bytes.length == 0)
115     {
116       // reading a parent/null node
117       return null;
118     }
119 
120     ObjectMapper mapper = new ObjectMapper();
121     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
122 
123     DeserializationConfig deserializationConfig = mapper.getDeserializationConfig();
124     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_FIELDS, true);
125     deserializationConfig.set(DeserializationConfig.Feature.AUTO_DETECT_SETTERS, true);
126     deserializationConfig.set(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, true);
127     try
128     {
129       ZNRecord zn = mapper.readValue(bais, ZNRecord.class);
130       return zn;
131     } catch (Exception e)
132     {
133       logger.error("Exception during deserialization of bytes: " + new String(bytes), e);
134       return null;
135     }
136   }
137 }