1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Arrays;
23 import java.util.Date;
24
25 import org.apache.helix.HelixException;
26 import org.apache.helix.HelixProperty;
27 import org.apache.helix.ZNRecord;
28 import org.apache.helix.ZkUnitTestBase;
29 import org.apache.helix.PropertyKey.Builder;
30 import org.apache.helix.manager.zk.ZKHelixAdmin;
31 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32 import org.apache.helix.manager.zk.ZNRecordSerializer;
33 import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
34 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
35 import org.apache.helix.manager.zk.ZkClient;
36 import org.apache.helix.model.IdealState;
37 import org.apache.helix.model.InstanceConfig;
38 import org.apache.log4j.Logger;
39 import org.testng.Assert;
40 import org.testng.annotations.Test;
41
42
43 public class TestZNRecordSizeLimit extends ZkUnitTestBase
44 {
45 private static Logger LOG = Logger.getLogger(TestZNRecordSizeLimit.class);
46
47 @Test
48 public void testZNRecordSizeLimitUseZNRecordSerializer()
49 {
50 String className = getShortClassName();
51 System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at "
52 + new Date(System.currentTimeMillis()));
53
54 ZNRecordSerializer serializer = new ZNRecordSerializer();
55 ZkClient zkClient = new ZkClient(ZK_ADDR);
56 zkClient.setZkSerializer(serializer);
57 String root = className;
58 byte[] buf = new byte[1024];
59 for (int i = 0; i < 1024; i++)
60 {
61 buf[i] = 'a';
62 }
63 String bufStr = new String(buf);
64
65
66
67
68 final ZNRecord smallRecord = new ZNRecord("normalsize");
69 smallRecord.getSimpleFields().clear();
70 for (int i = 0; i < 900; i++)
71 {
72 smallRecord.setSimpleField(i + "", bufStr);
73 }
74
75 String path1 = "/" + root + "/test1";
76 zkClient.createPersistent(path1, true);
77 zkClient.writeData(path1, smallRecord);
78
79 ZNRecord record = zkClient.readData(path1);
80 Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
81
82
83
84 final ZNRecord largeRecord = new ZNRecord("oversize");
85 largeRecord.getSimpleFields().clear();
86 for (int i = 0; i < 1024; i++)
87 {
88 largeRecord.setSimpleField(i + "", bufStr);
89 }
90 String path2 = "/" + root + "/test2";
91 zkClient.createPersistent(path2, true);
92 try
93 {
94 zkClient.writeData(path2, largeRecord);
95 Assert.fail("Should fail because data size is larger than 1M");
96 }
97 catch (HelixException e)
98 {
99
100 }
101 record = zkClient.readData(path2);
102 Assert.assertNull(record);
103
104
105 record = zkClient.readData(path1);
106 try
107 {
108 zkClient.writeData(path1, largeRecord);
109 Assert.fail("Should fail because data size is larger than 1M");
110 }
111 catch (HelixException e)
112 {
113
114 }
115 ZNRecord recordNew = zkClient.readData(path1);
116 byte[] arr = serializer.serialize(record);
117 byte[] arrNew = serializer.serialize(recordNew);
118 Assert.assertTrue(Arrays.equals(arr, arrNew));
119
120
121 ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
122 admin.addCluster(className, true);
123 InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
124 admin.addInstance(className, instanceConfig);
125
126
127 ZKHelixDataAccessor accessor =
128 new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
129 Builder keyBuilder = accessor.keyBuilder();
130
131 IdealState idealState = new IdealState("currentState");
132 idealState.setStateModelDefRef("MasterSlave");
133 idealState.setIdealStateMode("AUTO");
134 idealState.setNumPartitions(10);
135
136 for (int i = 0; i < 1024; i++)
137 {
138 idealState.getRecord().setSimpleField(i + "", bufStr);
139 }
140 boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
141 Assert.assertFalse(succeed);
142 HelixProperty property =
143 accessor.getProperty(keyBuilder.stateTransitionStatus("localhost_12918",
144 "session_1",
145 "partition_1"));
146 Assert.assertNull(property);
147
148
149 idealState.getRecord().getSimpleFields().clear();
150 idealState.setStateModelDefRef("MasterSlave");
151 idealState.setIdealStateMode("AUTO");
152 idealState.setNumPartitions(10);
153
154 for (int i = 0; i < 900; i++)
155 {
156 idealState.getRecord().setSimpleField(i + "", bufStr);
157 }
158 succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
159 Assert.assertTrue(succeed);
160 record =
161 accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
162 Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
163
164
165 idealState.getRecord().getSimpleFields().clear();
166 idealState.setStateModelDefRef("MasterSlave");
167 idealState.setIdealStateMode("AUTO");
168 idealState.setNumPartitions(10);
169 for (int i = 900; i < 1024; i++)
170 {
171 idealState.getRecord().setSimpleField(i + "", bufStr);
172 }
173
174 succeed =
175 accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
176 Assert.assertFalse(succeed);
177 recordNew =
178 accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
179 arr = serializer.serialize(record);
180 arrNew = serializer.serialize(recordNew);
181 Assert.assertTrue(Arrays.equals(arr, arrNew));
182
183 System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at "
184 + new Date(System.currentTimeMillis()));
185 }
186
187 @Test
188 public void testZNRecordSizeLimitUseZNRecordStreamingSerializer()
189 {
190 String className = getShortClassName();
191 System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
192 + new Date(System.currentTimeMillis()));
193
194 ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
195 ZkClient zkClient = new ZkClient(ZK_ADDR);
196 zkClient.setZkSerializer(serializer);
197 String root = className;
198 byte[] buf = new byte[1024];
199 for (int i = 0; i < 1024; i++)
200 {
201 buf[i] = 'a';
202 }
203 String bufStr = new String(buf);
204
205
206
207
208 final ZNRecord smallRecord = new ZNRecord("normalsize");
209 smallRecord.getSimpleFields().clear();
210 for (int i = 0; i < 900; i++)
211 {
212 smallRecord.setSimpleField(i + "", bufStr);
213 }
214
215 String path1 = "/" + root + "/test1";
216 zkClient.createPersistent(path1, true);
217 zkClient.writeData(path1, smallRecord);
218
219 ZNRecord record = zkClient.readData(path1);
220 Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
221
222
223
224 final ZNRecord largeRecord = new ZNRecord("oversize");
225 largeRecord.getSimpleFields().clear();
226 for (int i = 0; i < 1024; i++)
227 {
228 largeRecord.setSimpleField(i + "", bufStr);
229 }
230 String path2 = "/" + root + "/test2";
231 zkClient.createPersistent(path2, true);
232 try
233 {
234 zkClient.writeData(path2, largeRecord);
235 Assert.fail("Should fail because data size is larger than 1M");
236 }
237 catch (HelixException e)
238 {
239
240 }
241 record = zkClient.readData(path2);
242 Assert.assertNull(record);
243
244
245 record = zkClient.readData(path1);
246 try
247 {
248 zkClient.writeData(path1, largeRecord);
249 Assert.fail("Should fail because data size is larger than 1M");
250 }
251 catch (HelixException e)
252 {
253
254 }
255 ZNRecord recordNew = zkClient.readData(path1);
256 byte[] arr = serializer.serialize(record);
257 byte[] arrNew = serializer.serialize(recordNew);
258 Assert.assertTrue(Arrays.equals(arr, arrNew));
259
260
261 ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
262 admin.addCluster(className, true);
263 InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
264 admin.addInstance(className, instanceConfig);
265
266
267 ZKHelixDataAccessor accessor =
268 new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
269 Builder keyBuilder = accessor.keyBuilder();
270
271
272 IdealState idealState = new IdealState("currentState");
273 idealState.setStateModelDefRef("MasterSlave");
274 idealState.setIdealStateMode("AUTO");
275 idealState.setNumPartitions(10);
276
277 for (int i = 0; i < 1024; i++)
278 {
279 idealState.getRecord().setSimpleField(i + "", bufStr);
280 }
281 boolean succeed =
282 accessor.setProperty(keyBuilder.idealStates("TestDB_1"),
283 idealState);
284 Assert.assertFalse(succeed);
285 HelixProperty property =
286 accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
287 Assert.assertNull(property);
288
289
290 idealState.getRecord().getSimpleFields().clear();
291 idealState.setStateModelDefRef("MasterSlave");
292 idealState.setIdealStateMode("AUTO");
293 idealState.setNumPartitions(10);
294
295 for (int i = 0; i < 900; i++)
296 {
297 idealState.getRecord().setSimpleField(i + "", bufStr);
298 }
299 succeed =
300 accessor.setProperty(keyBuilder.idealStates("TestDB_2"),
301 idealState);
302 Assert.assertTrue(succeed);
303 record =
304 accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
305 Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
306
307
308 idealState.getRecord().getSimpleFields().clear();
309 idealState.setStateModelDefRef("MasterSlave");
310 idealState.setIdealStateMode("AUTO");
311 idealState.setNumPartitions(10);
312
313 for (int i = 900; i < 1024; i++)
314 {
315 idealState.getRecord().setSimpleField(i + "", bufStr);
316 }
317
318 succeed =
319 accessor.updateProperty(keyBuilder.idealStates("TestDB_2"),
320 idealState);
321 Assert.assertFalse(succeed);
322 recordNew =
323 accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
324 arr = serializer.serialize(record);
325 arrNew = serializer.serialize(recordNew);
326 Assert.assertTrue(Arrays.equals(arr, arrNew));
327
328 System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at "
329 + new Date(System.currentTimeMillis()));
330
331 }
332 }