View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Arrays;
23  import java.util.Date;
24  
25  import org.apache.helix.HelixException;
26  import org.apache.helix.HelixProperty;
27  import org.apache.helix.ZNRecord;
28  import org.apache.helix.ZkUnitTestBase;
29  import org.apache.helix.PropertyKey.Builder;
30  import org.apache.helix.manager.zk.ZKHelixAdmin;
31  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32  import org.apache.helix.manager.zk.ZNRecordSerializer;
33  import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
34  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
35  import org.apache.helix.manager.zk.ZkClient;
36  import org.apache.helix.model.IdealState;
37  import org.apache.helix.model.InstanceConfig;
38  import org.apache.log4j.Logger;
39  import org.testng.Assert;
40  import org.testng.annotations.Test;
41  
42  
43  public class TestZNRecordSizeLimit extends ZkUnitTestBase
44  {
45    private static Logger LOG = Logger.getLogger(TestZNRecordSizeLimit.class);
46  
47    @Test
48    public void testZNRecordSizeLimitUseZNRecordSerializer()
49    {
50      String className = getShortClassName();
51      System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at "
52          + new Date(System.currentTimeMillis()));
53  
54      ZNRecordSerializer serializer = new ZNRecordSerializer();
55      ZkClient zkClient = new ZkClient(ZK_ADDR);
56      zkClient.setZkSerializer(serializer);
57      String root = className;
58      byte[] buf = new byte[1024];
59      for (int i = 0; i < 1024; i++)
60      {
61        buf[i] = 'a';
62      }
63      String bufStr = new String(buf);
64  
65      // test zkClient
66      // legal-sized data gets written to zk
67      // write a znode of size less than 1m
68      final ZNRecord smallRecord = new ZNRecord("normalsize");
69      smallRecord.getSimpleFields().clear();
70      for (int i = 0; i < 900; i++)
71      {
72        smallRecord.setSimpleField(i + "", bufStr);
73      }
74  
75      String path1 = "/" + root + "/test1";
76      zkClient.createPersistent(path1, true);
77      zkClient.writeData(path1, smallRecord);
78  
79      ZNRecord record = zkClient.readData(path1);
80      Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
81  
82      // oversized data doesn't create any data on zk
83      // prepare a znode of size larger than 1m
84      final ZNRecord largeRecord = new ZNRecord("oversize");
85      largeRecord.getSimpleFields().clear();
86      for (int i = 0; i < 1024; i++)
87      {
88        largeRecord.setSimpleField(i + "", bufStr);
89      }
90      String path2 = "/" + root + "/test2";
91      zkClient.createPersistent(path2, true);
92      try
93      {
94        zkClient.writeData(path2, largeRecord);
95        Assert.fail("Should fail because data size is larger than 1M");
96      }
97      catch (HelixException e)
98      {
99        // OK
100     }
101     record = zkClient.readData(path2);
102     Assert.assertNull(record);
103 
104     // oversized write doesn't overwrite existing data on zk
105     record = zkClient.readData(path1);
106     try
107     {
108       zkClient.writeData(path1, largeRecord);
109       Assert.fail("Should fail because data size is larger than 1M");
110     }
111     catch (HelixException e)
112     {
113       // OK
114     }
115     ZNRecord recordNew = zkClient.readData(path1);
116     byte[] arr = serializer.serialize(record);
117     byte[] arrNew = serializer.serialize(recordNew);
118     Assert.assertTrue(Arrays.equals(arr, arrNew));
119 
120     // test ZkDataAccessor
121     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
122     admin.addCluster(className, true);
123     InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
124     admin.addInstance(className, instanceConfig);
125 
126     // oversized data should not create any new data on zk
127     ZKHelixDataAccessor accessor =
128         new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
129     Builder keyBuilder = accessor.keyBuilder();
130 
131     IdealState idealState = new IdealState("currentState");
132     idealState.setStateModelDefRef("MasterSlave");
133     idealState.setIdealStateMode("AUTO");
134     idealState.setNumPartitions(10);
135 
136     for (int i = 0; i < 1024; i++)
137     {
138       idealState.getRecord().setSimpleField(i + "", bufStr);
139     }
140     boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
141     Assert.assertFalse(succeed);
142     HelixProperty property =
143         accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
144                                                               "session_1",
145                                                               "partition_1"));
146     Assert.assertNull(property);
147 
148     // legal sized data gets written to zk
149     idealState.getRecord().getSimpleFields().clear();
150     idealState.setStateModelDefRef("MasterSlave");
151     idealState.setIdealStateMode("AUTO");
152     idealState.setNumPartitions(10);
153 
154     for (int i = 0; i < 900; i++)
155     {
156       idealState.getRecord().setSimpleField(i + "", bufStr);
157     }
158     succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
159     Assert.assertTrue(succeed);
160     record =
161         accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
162     Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
163 
164     // oversized data should not update existing data on zk
165     idealState.getRecord().getSimpleFields().clear();
166     idealState.setStateModelDefRef("MasterSlave");
167     idealState.setIdealStateMode("AUTO");
168     idealState.setNumPartitions(10);
169     for (int i = 900; i < 1024; i++)
170     {
171       idealState.getRecord().setSimpleField(i + "", bufStr);
172     }
173     // System.out.println("record: " + idealState.getRecord());
174     succeed =
175         accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
176     Assert.assertFalse(succeed);
177     recordNew =
178         accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
179     arr = serializer.serialize(record);
180     arrNew = serializer.serialize(recordNew);
181     Assert.assertTrue(Arrays.equals(arr, arrNew));
182 
183     System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
184         + new Date(System.currentTimeMillis()));
185   }
186 
187   @Test
188   public void testZNRecordSizeLimitUseZNRecordStreamingSerializer()
189   {
190     String className = getShortClassName();
191     System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
192         + new Date(System.currentTimeMillis()));
193 
194     ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
195     ZkClient zkClient = new ZkClient(ZK_ADDR);
196     zkClient.setZkSerializer(serializer);
197     String root = className;
198     byte[] buf = new byte[1024];
199     for (int i = 0; i < 1024; i++)
200     {
201       buf[i] = 'a';
202     }
203     String bufStr = new String(buf);
204 
205     // test zkClient
206     // legal-sized data gets written to zk
207     // write a znode of size less than 1m
208     final ZNRecord smallRecord = new ZNRecord("normalsize");
209     smallRecord.getSimpleFields().clear();
210     for (int i = 0; i < 900; i++)
211     {
212       smallRecord.setSimpleField(i + "", bufStr);
213     }
214 
215     String path1 = "/" + root + "/test1";
216     zkClient.createPersistent(path1, true);
217     zkClient.writeData(path1, smallRecord);
218 
219     ZNRecord record = zkClient.readData(path1);
220     Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
221 
222     // oversized data doesn't create any data on zk
223     // prepare a znode of size larger than 1m
224     final ZNRecord largeRecord = new ZNRecord("oversize");
225     largeRecord.getSimpleFields().clear();
226     for (int i = 0; i < 1024; i++)
227     {
228       largeRecord.setSimpleField(i + "", bufStr);
229     }
230     String path2 = "/" + root + "/test2";
231     zkClient.createPersistent(path2, true);
232     try
233     {
234       zkClient.writeData(path2, largeRecord);
235       Assert.fail("Should fail because data size is larger than 1M");
236     }
237     catch (HelixException e)
238     {
239       // OK
240     }
241     record = zkClient.readData(path2);
242     Assert.assertNull(record);
243 
244     // oversized write doesn't overwrite existing data on zk
245     record = zkClient.readData(path1);
246     try
247     {
248       zkClient.writeData(path1, largeRecord);
249       Assert.fail("Should fail because data size is larger than 1M");
250     }
251     catch (HelixException e)
252     {
253       // OK
254     }
255     ZNRecord recordNew = zkClient.readData(path1);
256     byte[] arr = serializer.serialize(record);
257     byte[] arrNew = serializer.serialize(recordNew);
258     Assert.assertTrue(Arrays.equals(arr, arrNew));
259 
260     // test ZkDataAccessor
261     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
262     admin.addCluster(className, true);
263     InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
264     admin.addInstance(className, instanceConfig);
265 
266     // oversized data should not create any new data on zk
267     ZKHelixDataAccessor accessor =
268         new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
269     Builder keyBuilder = accessor.keyBuilder();
270 
271 //    ZNRecord statusUpdates = new ZNRecord("statusUpdates");
272     IdealState idealState = new IdealState("currentState");
273     idealState.setStateModelDefRef("MasterSlave");
274     idealState.setIdealStateMode("AUTO");
275     idealState.setNumPartitions(10);
276 
277     for (int i = 0; i < 1024; i++)
278     {
279       idealState.getRecord().setSimpleField(i + "", bufStr);
280     }
281     boolean succeed =
282         accessor.setProperty(keyBuilder.idealStates("TestDB_1"),
283                                                               idealState);
284     Assert.assertFalse(succeed);
285     HelixProperty property =
286         accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
287     Assert.assertNull(property);
288 
289     // legal sized data gets written to zk
290     idealState.getRecord().getSimpleFields().clear();
291     idealState.setStateModelDefRef("MasterSlave");
292     idealState.setIdealStateMode("AUTO");
293     idealState.setNumPartitions(10);
294 
295     for (int i = 0; i < 900; i++)
296     {
297       idealState.getRecord().setSimpleField(i + "", bufStr);
298     }
299     succeed =
300         accessor.setProperty(keyBuilder.idealStates("TestDB_2"),
301                                                               idealState);
302     Assert.assertTrue(succeed);
303     record =
304         accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
305     Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
306 
307     // oversized data should not update existing data on zk
308     idealState.getRecord().getSimpleFields().clear();
309     idealState.setStateModelDefRef("MasterSlave");
310     idealState.setIdealStateMode("AUTO");
311     idealState.setNumPartitions(10);
312 
313     for (int i = 900; i < 1024; i++)
314     {
315       idealState.getRecord().setSimpleField(i + "", bufStr);
316     }
317     // System.out.println("record: " + idealState.getRecord());
318     succeed =
319         accessor.updateProperty(keyBuilder.idealStates("TestDB_2"),
320                                                                  idealState);
321     Assert.assertFalse(succeed);
322     recordNew =
323         accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
324     arr = serializer.serialize(record);
325     arrNew = serializer.serialize(recordNew);
326     Assert.assertTrue(Arrays.equals(arr, arrNew));
327 
328     System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
329         + new Date(System.currentTimeMillis()));
330 
331   }
332 }