View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  
26  import org.I0Itec.zkclient.DataUpdater;
27  import org.apache.helix.AccessOption;
28  import org.apache.helix.BaseDataAccessor;
29  import org.apache.helix.PropertyPathConfig;
30  import org.apache.helix.PropertyType;
31  import org.apache.helix.ZNRecord;
32  import org.apache.helix.ZNRecordUpdater;
33  import org.apache.helix.ZkUnitTestBase;
34  import org.apache.helix.manager.zk.ZNRecordSerializer;
35  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36  import org.apache.helix.manager.zk.ZkClient;
37  import org.apache.zookeeper.data.Stat;
38  import org.testng.Assert;
39  import org.testng.annotations.Test;
40  
41  
42  public class TestZkBaseDataAccessor extends ZkUnitTestBase
43  {
44    @Test
45    public void testSyncZkBaseDataAccessor()
46    {
47      System.out.println("START TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
48  
49      String root = "TestZkBaseDataAccessor_syn";
50      ZkClient zkClient = new ZkClient(ZK_ADDR);
51      zkClient.setZkSerializer(new ZNRecordSerializer());
52      zkClient.deleteRecursive("/" + root);
53  
54      BaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
55  
56      // test sync create
57      for (int i = 0; i < 10; i++)
58      {
59        String msgId = "msg_" + i;
60        String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
61        boolean success = accessor.create(path, new ZNRecord(msgId), AccessOption.PERSISTENT);
62        Assert.assertTrue(success, "Should succeed in create");
63      }
64  
65      // test get what we created
66      for (int i = 0; i < 10; i++)
67      {
68        String msgId = "msg_" + i;
69        String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
70        ZNRecord record = zkClient.readData(path);
71        Assert.assertEquals(record.getId(), msgId, "Should get what we created");
72      }
73  
74      // test sync set
75      for (int i = 0; i < 10; i++)
76      {
77        String msgId = "msg_" + i;
78        String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
79        ZNRecord newRecord = new ZNRecord(msgId);
80        newRecord.setSimpleField("key1", "value1");
81        boolean success = accessor.set(path, newRecord, AccessOption.PERSISTENT);
82        Assert.assertTrue(success, "Should succeed in set");
83      }
84  
85      // test get what we set
86      for (int i = 0; i < 10; i++)
87      {
88        String msgId = "msg_" + i;
89        String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
90        ZNRecord record = zkClient.readData(path);
91        Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
92        Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
93      }
94      
95      // test sync update
96      for (int i = 0; i < 10; i++)
97      {
98        String msgId = "msg_" + i;
99        String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
100       ZNRecord newRecord = new ZNRecord(msgId);
101       newRecord.setSimpleField("key2", "value2");
102       boolean success = accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
103       Assert.assertTrue(success, "Should succeed in update");
104     }
105 
106     // test get what we updated
107     for (int i = 0; i < 10; i++)
108     {
109       String msgId = "msg_" + i;
110       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
111       ZNRecord record = zkClient.readData(path);
112       Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
113       Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
114     }
115     
116     // test sync get
117     for (int i = 0; i < 10; i++)
118     {
119       String msgId = "msg_" + i;
120       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
121       ZNRecord record = accessor.get(path, null, 0);
122       Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
123       Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
124       Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
125     }
126 
127     // test sync exist
128     for (int i = 0; i < 10; i++)
129     {
130       String msgId = "msg_" + i;
131       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
132       boolean exists = accessor.exists(path, 0);
133       Assert.assertTrue(exists, "Should exist");
134     }
135 
136     // test getStat()
137     for (int i = 0; i < 10; i++)
138     {
139       String msgId = "msg_" + i;
140       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
141       Stat stat = accessor.getStat(path, 0);
142       Assert.assertNotNull(stat, "Stat should exist");
143       Assert.assertEquals(stat.getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1");
144     }
145 
146     // test sync remove
147     for (int i = 0; i < 10; i++)
148     {
149       String msgId = "msg_" + i;
150       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
151       boolean success = accessor.remove(path, 0);
152       Assert.assertTrue(success, "Should remove");
153     }
154     
155     // test get what we removed
156     for (int i = 0; i < 10; i++)
157     {
158       String msgId = "msg_" + i;
159       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_0", msgId);
160       boolean exists = zkClient.exists(path);
161       Assert.assertFalse(exists, "Should be removed");
162     }
163 
164     zkClient.close();
165     System.out.println("END TestZkBaseDataAccessor.sync at " + new Date(System.currentTimeMillis()));
166   }
167   
168   @Test
169   public void testAsyncZkBaseDataAccessor()
170   {
171     System.out.println("START TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
172 
173     String root = "TestZkBaseDataAccessor_asyn";
174     ZkClient zkClient = new ZkClient(ZK_ADDR);
175     zkClient.setZkSerializer(new ZNRecordSerializer());
176     zkClient.deleteRecursive("/" + root);
177 
178     ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
179     
180     // test async createChildren
181     String parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
182     List<ZNRecord> records = new ArrayList<ZNRecord>();
183     List<String> paths = new  ArrayList<String>();
184     for (int i = 0; i < 10; i++)
185     {
186       String msgId = "msg_" + i;
187       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
188       records.add(new ZNRecord(msgId));
189     }
190     boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
191     for (int i = 0; i < 10; i++)
192     {
193       String msgId = "msg_" + i;
194       Assert.assertTrue(success[i], "Should succeed in create " + msgId);
195     }
196 
197     // test get what we created
198     for (int i = 0; i < 10; i++)
199     {
200       String msgId = "msg_" + i;
201       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
202       ZNRecord record = zkClient.readData(path);
203       Assert.assertEquals(record.getId(), msgId, "Should get what we created");
204     }
205 
206     // test async setChildren
207     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
208     records = new ArrayList<ZNRecord>();
209     paths = new  ArrayList<String>();
210     for (int i = 0; i < 10; i++)
211     {
212       String msgId = "msg_" + i;
213       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
214       ZNRecord newRecord = new ZNRecord(msgId);
215       newRecord.setSimpleField("key1", "value1");
216       records.add(newRecord);
217     }
218     success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
219     for (int i = 0; i < 10; i++)
220     {
221       String msgId = "msg_" + i;
222       Assert.assertTrue(success[i], "Should succeed in set " + msgId);
223     }
224 
225     // test get what we set
226     for (int i = 0; i < 10; i++)
227     {
228       String msgId = "msg_" + i;
229       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
230       ZNRecord record = zkClient.readData(path);
231       Assert.assertEquals(record.getSimpleFields().size(), 1, "Should have 1 simple field set");
232       Assert.assertEquals(record.getSimpleField("key1"), "value1", "Should have value1 set");
233     }
234     
235     // test async updateChildren
236     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
237 //    records = new ArrayList<ZNRecord>();
238     List<DataUpdater<ZNRecord>> znrecordUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
239     paths = new ArrayList<String>();
240     for (int i = 0; i < 10; i++)
241     {
242       String msgId = "msg_" + i;
243       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1",msgId));
244       ZNRecord newRecord = new ZNRecord(msgId);
245       newRecord.setSimpleField("key2", "value2");
246 //      records.add(newRecord);
247       znrecordUpdaters.add(new ZNRecordUpdater(newRecord));
248     }
249     success = accessor.updateChildren(paths, znrecordUpdaters, AccessOption.PERSISTENT);
250     for (int i = 0; i < 10; i++)
251     {
252       String msgId = "msg_" + i;
253       Assert.assertTrue(success[i], "Should succeed in update " + msgId);
254     }
255 
256     // test get what we updated
257     for (int i = 0; i < 10; i++)
258     {
259       String msgId = "msg_" + i;
260       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
261       ZNRecord record = zkClient.readData(path);
262       Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
263       Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
264     }
265 
266     // test async getChildren
267     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
268     records = accessor.getChildren(parentPath, null, 0);
269     for (int i = 0; i < 10; i++)
270     {
271       String msgId = "msg_" + i;
272       ZNRecord record = records.get(i);
273       Assert.assertEquals(record.getId(), msgId, "Should get what we updated");
274       Assert.assertEquals(record.getSimpleFields().size(), 2, "Should have 2 simple fields set");
275       Assert.assertEquals(record.getSimpleField("key2"), "value2", "Should have value2 set");
276     }
277 
278     // test async exists
279     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
280     paths = new ArrayList<String>();
281     for (int i = 0; i < 10; i++)
282     {
283       String msgId = "msg_" + i;
284       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
285     }
286     boolean[] exists = accessor.exists(paths, 0);
287     for (int i = 0; i < 10; i++)
288     {
289       String msgId = "msg_" + i;
290       Assert.assertTrue(exists[i], "Should exist " + msgId);
291     }
292 
293     // test async getStats
294     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
295     paths = new ArrayList<String>();
296     for (int i = 0; i < 10; i++)
297     {
298       String msgId = "msg_" + i;
299       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
300     }
301     Stat[] stats = accessor.getStats(paths, 0);
302     for (int i = 0; i < 10; i++)
303     {
304       String msgId = "msg_" + i;
305       Assert.assertNotNull(stats[i], "Stat should exist for " + msgId);
306       Assert.assertEquals(stats[i].getVersion(), 2, "DataVersion should be 2, since we set 1 and update 1 for " + msgId);
307     }
308 
309     // test async remove
310     parentPath = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1");
311     paths = new ArrayList<String>();
312     for (int i = 0; i < 10; i++)
313     {
314       String msgId = "msg_" + i;
315       paths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId));
316     }
317     success = accessor.remove(paths, 0);
318     for (int i = 0; i < 10; i++)
319     {
320       String msgId = "msg_" + i;
321       Assert.assertTrue(success[i], "Should succeed in remove " + msgId);
322     }
323     
324     // test get what we removed
325     for (int i = 0; i < 10; i++)
326     {
327       String msgId = "msg_" + i;
328       String path = PropertyPathConfig.getPath(PropertyType.MESSAGES, root, "host_1", msgId);
329       boolean pathExists = zkClient.exists(path);
330       Assert.assertFalse(pathExists, "Should be removed " + msgId);
331     }
332 
333     zkClient.close();
334     System.out.println("END TestZkBaseDataAccessor.async at " + new Date(System.currentTimeMillis()));
335   }
336  
337 }