View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Date;
25  import java.util.List;
26  import java.util.concurrent.Callable;
27  
28  import org.I0Itec.zkclient.DataUpdater;
29  import org.apache.helix.AccessOption;
30  import org.apache.helix.PropertyPathConfig;
31  import org.apache.helix.PropertyType;
32  import org.apache.helix.TestHelper;
33  import org.apache.helix.ZNRecord;
34  import org.apache.helix.ZNRecordUpdater;
35  import org.apache.helix.ZkUnitTestBase;
36  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
37  import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
38  import org.testng.Assert;
39  import org.testng.annotations.Test;
40  
41  
42  public class TestWtCacheAsyncOpMultiThread extends ZkUnitTestBase
43  {
44    class TestCreateZkCacheBaseDataAccessor implements Callable<Boolean>
45    {
46      final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
47      final String                         _clusterName;
48      final int _id;
49  
50      public TestCreateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
51      {
52        _accessor = accessor;
53        _clusterName = clusterName;
54        _id = id;
55      }
56  
57      @Override
58      public Boolean call() throws Exception
59      {
60        // create 10 current states in 2 steps
61        List<String> paths = new ArrayList<String>();
62        List<ZNRecord> records = new ArrayList<ZNRecord>();
63        for (int j = 0; j < 2; j++)
64        {
65          paths.clear();
66          records.clear();
67  
68          if (_id == 1 && j == 0)
69          {
70            // let thread_0 create 0-4
71            Thread.sleep(30);
72          }
73  
74          if (_id == 0 && j == 1)
75          {
76            // let thread_1 create 5-9
77            Thread.sleep(100);
78          }
79          
80          
81          for (int i = 0; i < 5; i++)
82          {
83            int k = j * 5 + i;
84            String path =
85                PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
86                                           _clusterName,
87                                           "localhost_8901",
88                                           "session_0",
89                                           "TestDB" + k);
90            ZNRecord record = new ZNRecord("TestDB" + k);
91  
92            paths.add(path);
93            records.add(record);
94          }
95          
96          boolean[] success = _accessor.createChildren(paths, records, AccessOption.PERSISTENT);
97          // System.out.println("thread-" + _id + " creates " + j  + ": " + Arrays.toString(success));
98          
99          // create all all sync'ed, so we shall see either all true or all false
100         for (int i = 1; i < 5; i++)
101         {
102           Assert.assertEquals(success[i], success[0], "Should be either all succeed of all fail");
103         }
104       }
105 
106       return true;
107     }
108   }
109   
110   class TestUpdateZkCacheBaseDataAccessor implements Callable<Boolean>
111   {
112     final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
113     final String                         _clusterName;
114     final int _id;
115 
116     public TestUpdateZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
117     {
118       _accessor = accessor;
119       _clusterName = clusterName;
120       _id = id;
121     }
122 
123     @Override
124     public Boolean call() throws Exception
125     {
126       // create 10 current states in 2 steps
127       List<String> paths = new ArrayList<String>();
128       List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
129       for (int j = 0; j < 10; j++)
130       {
131         paths.clear();
132         updaters.clear();
133 
134         for (int i = 0; i < 10; i++)
135         {
136           String path =
137               PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
138                                          _clusterName,
139                                          "localhost_8901",
140                                          "session_0",
141                                          "TestDB" + i);
142           
143           ZNRecord newRecord = new ZNRecord("TestDB" + i);
144           newRecord.setSimpleField("" + j, "" + j);
145           DataUpdater<ZNRecord> updater = new ZNRecordUpdater(newRecord);
146           paths.add(path);
147           updaters.add(updater);
148         }
149         
150         boolean[] success = _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
151         // System.out.println("thread-" + _id + " updates " + j + ": " + Arrays.toString(success));
152         
153         for (int i = 0; i < 10; i++)
154         {
155           Assert.assertEquals(success[i], true, "Should be all succeed");
156         }
157       }
158 
159       return true;
160     }
161   }
162   
163   class TestSetZkCacheBaseDataAccessor implements Callable<Boolean>
164   {
165     final ZkCacheBaseDataAccessor<ZNRecord> _accessor;
166     final String                         _clusterName;
167     final int _id;
168 
169     public TestSetZkCacheBaseDataAccessor(ZkCacheBaseDataAccessor<ZNRecord> accessor, String clusterName, int id)
170     {
171       _accessor = accessor;
172       _clusterName = clusterName;
173       _id = id;
174     }
175 
176     @Override
177     public Boolean call() throws Exception
178     {
179       // create 10 current states in 2 steps
180       List<String> paths = new ArrayList<String>();
181       List<ZNRecord> records = new ArrayList<ZNRecord>();
182       for (int j = 0; j < 2; j++)
183       {
184         paths.clear();
185         records.clear();
186 
187         if (_id == 1 && j == 0)
188         {
189           // let thread_0 create 0-4
190           Thread.sleep(30);
191         }
192 
193         if (_id == 0 && j == 1)
194         {
195           // let thread_1 create 5-9
196           Thread.sleep(100);
197         }
198 
199         for (int i = 0; i < 5; i++)
200         {
201           int k = j * 5 + i;
202           String path =
203               PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName, "TestDB" + k);
204           ZNRecord record = new ZNRecord("TestDB" + k);
205 
206           paths.add(path);
207           records.add(record);
208         }
209         boolean[] success = _accessor.setChildren(paths, records, AccessOption.PERSISTENT);
210         // System.out.println("thread-" + _id + " sets " + j  + ": " + Arrays.toString(success));
211         
212         for (int i = 0; i < 5; i++)
213         {
214           Assert.assertEquals(success[i], true);
215         }
216       }
217 
218       return true;
219     }
220   }
221   
222   @Test
223   public void testHappyPathZkCacheBaseDataAccessor()
224   {
225     String className = TestHelper.getTestClassName();
226     String methodName = TestHelper.getTestMethodName();
227     String clusterName = className + "_" + methodName;
228     System.out.println("START " + clusterName + " at "
229         + new Date(System.currentTimeMillis()));
230 
231     // init zkCacheDataAccessor
232     String curStatePath =
233         PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
234                                    clusterName,
235                                    "localhost_8901");
236     String extViewPath =
237         PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
238 
239     ZkBaseDataAccessor<ZNRecord> baseAccessor =
240         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
241 
242     baseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
243 
244     List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
245     ZkCacheBaseDataAccessor<ZNRecord> accessor =
246         new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor,
247                                            null,
248                                            cachePaths,
249                                            null);
250 
251     // TestHelper.printCache(accessor._wtCache);
252     boolean ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
253     Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
254 
255     // create 10 current states using 2 threads
256     List<Callable<Boolean>> threads = new ArrayList<Callable<Boolean>>();
257     for (int i = 0; i < 2; i++)
258     {
259       threads.add(new TestCreateZkCacheBaseDataAccessor(accessor, clusterName, i));
260     }
261     TestHelper.startThreadsConcurrently(threads, 1000);
262 
263     // verify wtCache
264     // TestHelper.printCache(accessor._wtCache);
265     ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
266     Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
267     
268     // update 10 current states 10 times using 2 threads
269     threads.clear();
270     for (int i = 0; i < 2; i++)
271     {
272       threads.add(new TestUpdateZkCacheBaseDataAccessor(accessor, clusterName, i));
273     }
274     TestHelper.startThreadsConcurrently(threads, 1000);
275     
276     // verify wtCache
277     // TestHelper.printCache(accessor._wtCache);
278     ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
279     Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
280 
281     // set 10 external views using 2 threads
282     threads.clear();
283     for (int i = 0; i < 2; i++)
284     {
285       threads.add(new TestSetZkCacheBaseDataAccessor(accessor, clusterName, i));
286     }
287     TestHelper.startThreadsConcurrently(threads, 1000);
288     
289     // verify wtCache
290     // TestHelper.printCache(accessor._wtCache);
291     ret = TestHelper.verifyZkCache(cachePaths, accessor._wtCache._cache, _gZkClient, false);
292     Assert.assertTrue(ret, "wtCache doesn't match data on Zk");
293     
294     System.out.println("END " + clusterName + " at "
295         + new Date(System.currentTimeMillis()));
296   }
297 }