View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Date;
25  import java.util.List;
26  
27  import org.I0Itec.zkclient.DataUpdater;
28  import org.apache.helix.AccessOption;
29  import org.apache.helix.PropertyPathConfig;
30  import org.apache.helix.PropertyType;
31  import org.apache.helix.TestHelper;
32  import org.apache.helix.ZNRecord;
33  import org.apache.helix.ZNRecordUpdater;
34  import org.apache.helix.ZkUnitTestBase;
35  import org.apache.helix.manager.zk.ZNRecordSerializer;
36  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
37  import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
38  import org.apache.helix.manager.zk.ZkClient;
39  import org.testng.Assert;
40  import org.testng.annotations.Test;
41  
42  
43  public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase
44  {
45    @Test
46    public void testHappyPathExtOpZkCacheBaseDataAccessor() throws Exception
47    {
48      String className = TestHelper.getTestClassName();
49      String methodName = TestHelper.getTestMethodName();
50      String clusterName = className + "_" + methodName;
51      System.out.println("START " + clusterName + " at "
52          + new Date(System.currentTimeMillis()));
53  
54      // init external base data accessor
55      ZkClient extZkclient = new ZkClient(ZK_ADDR);
56      extZkclient.setZkSerializer(new ZNRecordSerializer());
57      ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
58          new ZkBaseDataAccessor<ZNRecord>(extZkclient);
59  
60      // init zkCacheBaseDataAccessor
61      String curStatePath =
62          PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
63                                     clusterName,
64                                     "localhost_8901");
65      String extViewPath =
66          PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
67  
68      ZkBaseDataAccessor<ZNRecord> baseAccessor =
69          new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
70  
71      extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
72  
73      List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
74      ZkCacheBaseDataAccessor<ZNRecord> accessor =
75          new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
76  
77      // TestHelper.printCache(accessor._zkCache);
78      boolean ret =
79          TestHelper.verifyZkCache(zkCacheInitPaths,
80                                   accessor._zkCache._cache,
81                                   _gZkClient,
82                                   true);
83      Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
84  
85      // create 10 current states using external base accessor
86      List<String> paths = new ArrayList<String>();
87      List<ZNRecord> records = new ArrayList<ZNRecord>();
88      for (int i = 0; i < 10; i++)
89      {
90        String path =
91            PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
92                                       clusterName,
93                                       "localhost_8901",
94                                       "session_0",
95                                       "TestDB" + i);
96        ZNRecord record = new ZNRecord("TestDB" + i);
97  
98        paths.add(path);
99        records.add(record);
100     }
101 
102     boolean[] success = extBaseAccessor.createChildren(paths, records, AccessOption.PERSISTENT);
103     for (int i = 0; i < 10; i++)
104     {
105       Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
106     }
107 
108     // wait zkEventThread update zkCache
109     // verify wtCache
110     for (int i = 0; i < 10; i++)
111     {
112         // TestHelper.printCache(accessor._zkCache);
113     	ret =
114         TestHelper.verifyZkCache(zkCacheInitPaths,
115                                  accessor._zkCache._cache,
116                                  _gZkClient,
117                                  true);
118     	if (ret == true)
119     		break;
120         Thread.sleep(100);
121     }
122     
123     // System.out.println("ret: " + ret);
124     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
125 
126     // update each current state 10 times by external base accessor
127     List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
128     for (int j = 0; j < 10; j++)
129     {
130       paths.clear();
131       updaters.clear();
132       for (int i = 0; i < 10; i++)
133       {
134         String path = curStatePath + "/session_0/TestDB" + i;
135         ZNRecord newRecord = new ZNRecord("TestDB" + i);
136         newRecord.setSimpleField("" + j, "" + j);
137         DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
138         paths.add(path);
139         updaters.add(updater);
140       }
141       success = extBaseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
142 
143       for (int i = 0; i < 10; i++)
144       {
145         Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
146       }
147     }
148 
149     // wait zkEventThread update zkCache
150     Thread.sleep(100);
151 
152     // verify cache
153     // TestHelper.printCache(accessor._zkCache);
154     ret =
155         TestHelper.verifyZkCache(zkCacheInitPaths,
156                                  accessor._zkCache._cache,
157                                  _gZkClient,
158                                  true);
159     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
160 
161     // set 10 external views by external accessor
162     paths.clear();
163     records.clear();
164     for (int i = 0; i < 10; i++)
165     {
166       String path =
167           PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
168       ZNRecord record = new ZNRecord("TestDB" + i);
169 
170       paths.add(path);
171       records.add(record);
172     }
173     success = extBaseAccessor.setChildren(paths, records, AccessOption.PERSISTENT);
174     for (int i = 0; i < 10; i++)
175     {
176       Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
177     }
178 
179     // wait zkEventThread update zkCache
180     Thread.sleep(100);
181 
182     // verify cache
183     // TestHelper.printCache(accessor._zkCache._cache);
184     ret =
185         TestHelper.verifyZkCache(zkCacheInitPaths,
186                                  accessor._zkCache._cache,
187                                  _gZkClient,
188                                  true);
189     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
190 
191     // remove 10 external views by external accessor
192     paths.clear();
193     for (int i = 0; i < 10; i++)
194     {
195       String path =
196           PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB" + i);
197 
198       paths.add(path);
199     }
200     success = extBaseAccessor.remove(paths, 0);
201     for (int i = 0; i < 10; i++)
202     {
203       Assert.assertTrue(success[i], "Should succeed in remove: " + paths.get(i));
204     }
205 
206     // wait zkEventThread update zkCache
207     Thread.sleep(100);
208 
209     // verify cache
210     // TestHelper.printCache(accessor._zkCache._cache);
211     ret =
212         TestHelper.verifyZkCache(zkCacheInitPaths,
213                                  accessor._zkCache._cache,
214                                  _gZkClient,
215                                  true);
216     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
217 
218     // clean up
219     extZkclient.close();
220     System.out.println("END " + clusterName + " at "
221         + new Date(System.currentTimeMillis()));
222 
223   }
224 
225   @Test
226   public void testHappyPathSelfOpZkCacheBaseDataAccessor() throws Exception
227   {
228     String className = TestHelper.getTestClassName();
229     String methodName = TestHelper.getTestMethodName();
230     String clusterName = className + "_" + methodName;
231     System.out.println("START " + clusterName + " at "
232         + new Date(System.currentTimeMillis()));
233 
234     // init zkCacheDataAccessor
235     String curStatePath =
236         PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
237                                    clusterName,
238                                    "localhost_8901");
239     String extViewPath =
240         PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
241 
242     ZkBaseDataAccessor<ZNRecord> baseAccessor =
243         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
244 
245     baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
246 
247     List<String> zkCacheInitPaths = Arrays.asList(curStatePath, extViewPath);
248     ZkCacheBaseDataAccessor<ZNRecord> accessor =
249         new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, zkCacheInitPaths);
250 
251     // TestHelper.printCache(accessor._zkCache._cache);
252     boolean ret =
253         TestHelper.verifyZkCache(zkCacheInitPaths,
254                                  accessor._zkCache._cache,
255                                  _gZkClient,
256                                  true);
257     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
258 
259     // create 10 current states using this accessor
260     List<String> paths = new ArrayList<String>();
261     List<ZNRecord> records = new ArrayList<ZNRecord>();
262     for (int i = 0; i < 10; i++)
263     {
264       String path =
265           PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
266                                      clusterName,
267                                      "localhost_8901",
268                                      "session_0",
269                                      "TestDB" + i);
270       ZNRecord record = new ZNRecord("TestDB" + i);
271 
272       paths.add(path);
273       records.add(record);
274     }
275 
276     boolean[] success = accessor.createChildren(paths, records, AccessOption.PERSISTENT);
277     for (int i = 0; i < 10; i++)
278     {
279       Assert.assertTrue(success[i], "Should succeed in create: " + paths.get(i));
280     }
281 
282     // verify cache
283 //    TestHelper.printCache(accessor._zkCache._cache);
284     ret =
285         TestHelper.verifyZkCache(zkCacheInitPaths,
286                                  accessor._zkCache._cache,
287                                  _gZkClient,
288                                  false);
289     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
290 
291     // update each current state 10 times by this accessor
292     List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
293     for (int j = 0; j < 10; j++)
294     {
295       paths.clear();
296       updaters.clear();
297       for (int i = 0; i < 10; i++)
298       {
299         String path = curStatePath + "/session_0/TestDB" + i;
300         ZNRecord newRecord = new ZNRecord("TestDB" + i);
301         newRecord.setSimpleField("" + j, "" + j);
302         DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
303         paths.add(path);
304         updaters.add(updater);
305       }
306       success = accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
307 
308       for (int i = 0; i < 10; i++)
309       {
310         Assert.assertTrue(success[i], "Should succeed in update: " + paths.get(i));
311       }
312     }
313 
314     // verify cache
315     // TestHelper.printCache(accessor._zkCache._cache);
316     ret =
317         TestHelper.verifyZkCache(zkCacheInitPaths,
318                                  zkCacheInitPaths,
319                                  accessor._zkCache._cache,
320                                  _gZkClient,
321                                  true);
322     // ret = TestHelper.verifyZkCache(zkCacheInitPaths, accessor, _gZkClient, true);
323     // System.out.println("ret: " + ret);
324     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
325 
326     // set 10 external views 10 times by this accessor
327     paths.clear();
328     records.clear();
329     for (int j = 0; j < 10; j++)
330     {
331       for (int i = 0; i < 10; i++)
332       {
333         String path =
334             PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName, "TestDB"
335                 + i);
336         ZNRecord record = new ZNRecord("TestDB" + i);
337         record.setSimpleField("setKey", "" + j);
338 
339         paths.add(path);
340         records.add(record);
341       }
342       success = accessor.setChildren(paths, records, AccessOption.PERSISTENT);
343       for (int i = 0; i < 10; i++)
344       {
345         Assert.assertTrue(success[i], "Should succeed in set: " + paths.get(i));
346       }
347     }
348 
349     // verify cache
350     // TestHelper.printCache(accessor._zkCache._cache);
351     ret =
352         TestHelper.verifyZkCache(zkCacheInitPaths,
353                                  accessor._zkCache._cache,
354                                  _gZkClient,
355                                  true);
356     // System.out.println("ret: " + ret);
357     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
358 
359     // get 10 external views
360     paths.clear();
361     records.clear();
362     for (int i = 0; i < 10; i++)
363     {
364       String path = extViewPath + "/TestDB" + i;
365       paths.add(path);
366     }
367 
368     records = accessor.get(paths, null, 0);
369     for (int i = 0; i < 10; i++)
370     {
371       Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
372     }
373 
374     // getChildren
375     records.clear();
376     records = accessor.getChildren(extViewPath, null, 0);
377     for (int i = 0; i < 10; i++)
378     {
379       Assert.assertEquals(records.get(i).getId(), "TestDB" + i);
380     }
381 
382     // // exists
383     paths.clear();
384     for (int i = 0; i < 10; i++)
385     {
386       String path = curStatePath + "/session_0/TestDB" + i;
387       // // PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
388       // // clusterName,
389       // // "localhost_8901",
390       // // "session_0",
391       // // "TestDB" + i);
392       paths.add(path);
393     }
394     success = accessor.exists(paths, 0);
395     for (int i = 0; i < 10; i++)
396     {
397       Assert.assertTrue(success[i], "Should exits: " + paths.get(i));
398     }
399 
400     System.out.println("END " + clusterName + " at "
401         + new Date(System.currentTimeMillis()));
402 
403   }
404 
405 }