View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.Date;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.TreeSet;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  
32  import org.apache.helix.AccessOption;
33  import org.apache.helix.PropertyPathConfig;
34  import org.apache.helix.PropertyType;
35  import org.apache.helix.TestHelper;
36  import org.apache.helix.ZNRecord;
37  import org.apache.helix.ZNRecordUpdater;
38  import org.apache.helix.ZkUnitTestBase;
39  import org.apache.helix.manager.zk.ZNRecordSerializer;
40  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
41  import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
42  import org.apache.helix.manager.zk.ZkClient;
43  import org.apache.helix.store.HelixPropertyListener;
44  import org.testng.Assert;
45  import org.testng.annotations.Test;
46  
47  
48  public class TestZkCacheSyncOpSingleThread extends ZkUnitTestBase
49  {
50    class TestListener implements HelixPropertyListener
51    {
52      ConcurrentLinkedQueue<String> _deletePathQueue = new ConcurrentLinkedQueue<String>();
53      ConcurrentLinkedQueue<String> _createPathQueue = new ConcurrentLinkedQueue<String>();
54      ConcurrentLinkedQueue<String> _changePathQueue = new ConcurrentLinkedQueue<String>();
55  
56      @Override
57      public void onDataDelete(String path)
58      {
59        // System.out.println(Thread.currentThread().getName() + ", onDelete: " + path);
60      	_deletePathQueue.add(path);
61      }
62  
63      @Override
64      public void onDataCreate(String path)
65      {
66        // System.out.println(Thread.currentThread().getName() + ", onCreate: " + path);
67      	_createPathQueue.add(path);
68      }
69  
70      @Override
71      public void onDataChange(String path)
72      {
73        // System.out.println(Thread.currentThread().getName() + ", onChange: " + path);
74      	_changePathQueue.add(path);
75      }
76  
77      public void reset()
78      {
79        _deletePathQueue.clear();
80        _createPathQueue.clear();
81        _changePathQueue.clear();
82      }
83    }
84  
85    @Test
86    public void testZkCacheCallbackExternalOpNoChroot() throws Exception
87    {
88      String className = TestHelper.getTestClassName();
89      String methodName = TestHelper.getTestMethodName();
90      String clusterName = className + "_" + methodName;
91      System.out.println("START " + clusterName + " at "
92          + new Date(System.currentTimeMillis()));
93  
94      // init external base data accessor
95      ZkClient zkclient = new ZkClient(ZK_ADDR);
96      zkclient.setZkSerializer(new ZNRecordSerializer());
97      ZkBaseDataAccessor<ZNRecord> extBaseAccessor =
98          new ZkBaseDataAccessor<ZNRecord>(zkclient);
99  
100     // init zkCacheDataAccessor
101     String curStatePath =
102         PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
103                                    clusterName,
104                                    "localhost_8901");
105     String extViewPath =
106         PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
107 
108     ZkBaseDataAccessor<ZNRecord> baseAccessor =
109         new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
110 
111     extBaseAccessor.create(curStatePath, null, AccessOption.PERSISTENT);
112 
113     List<String> cachePaths = Arrays.asList(curStatePath, extViewPath);
114     ZkCacheBaseDataAccessor<ZNRecord> accessor =
115         new ZkCacheBaseDataAccessor<ZNRecord>(baseAccessor, null, null, cachePaths);
116     // TestHelper.printCache(accessor._zkCache._cache);
117 
118     TestListener listener = new TestListener();
119     accessor.subscribe(curStatePath, listener);
120 
121     // create 10 current states
122     List<String> createPaths = new ArrayList<String>();
123     for (int i = 0; i < 10; i++)
124     {
125       String path = curStatePath + "/session_0/TestDB" + i;
126       createPaths.add(path);
127       boolean success =
128           extBaseAccessor.create(path,
129                                  new ZNRecord("TestDB" + i),
130                                  AccessOption.PERSISTENT);
131       Assert.assertTrue(success, "Should succeed in create: " + path);
132     }
133 
134     Thread.sleep(500);
135 
136     // verify cache
137     // TestHelper.printCache(accessor._zkCache._cache);
138     boolean ret =
139         TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
140     // System.out.println("ret: " + ret);
141     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
142     System.out.println("createCnt: " + listener._createPathQueue.size());
143     Assert.assertEquals(listener._createPathQueue.size(),
144                         11,
145                         "Shall get 11 onCreate callbacks.");
146 
147     // verify each callback path
148     createPaths.add(curStatePath + "/session_0");
149     List<String> createCallbackPaths = new ArrayList<String>(listener._createPathQueue);
150     Collections.sort(createPaths);
151     Collections.sort(createCallbackPaths);
152     // System.out.println("createCallbackPaths: " + createCallbackPaths);
153     Assert.assertEquals(createCallbackPaths,
154                         createPaths,
155                         "Should get create callbacks at " + createPaths + ", but was "
156                             + createCallbackPaths);
157 
158     // update each current state, single thread
159     List<String> updatePaths = new ArrayList<String>();
160     listener.reset();
161     for (int i = 0; i < 10; i++)
162     {
163       String path = curStatePath + "/session_0/TestDB" + i;
164       for (int j = 0; j < 10; j++)
165       {
166         updatePaths.add(path);
167         ZNRecord newRecord = new ZNRecord("TestDB" + i);
168         newRecord.setSimpleField("" + j, "" + j);
169         boolean success =
170             accessor.update(path, new ZNRecordUpdater(newRecord), AccessOption.PERSISTENT);
171         Assert.assertTrue(success, "Should succeed in update: " + path);
172       }
173     }
174     Thread.sleep(500);
175 
176     // verify cache
177     // TestHelper.printCache(accessor._zkCache._cache);
178     ret =
179         TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
180     // System.out.println("ret: " + ret);
181     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
182     System.out.println("changeCnt: " + listener._changePathQueue.size());
183     Assert.assertEquals(listener._changePathQueue.size(),
184                         100,
185                         "Shall get 100 onChange callbacks.");
186 
187     // verify each callback path
188     List<String> updateCallbackPaths = new ArrayList<String>(listener._changePathQueue);
189     Collections.sort(updatePaths);
190     Collections.sort(updateCallbackPaths);
191     Assert.assertEquals(updateCallbackPaths,
192                         updatePaths,
193                         "Should get change callbacks at " + updatePaths + ", but was "
194                             + updateCallbackPaths);
195 
196     // remove 10 current states
197     TreeSet<String> removePaths = new TreeSet<String>();
198     listener.reset();
199     for (int i = 0; i < 10; i++)
200     {
201       String path = curStatePath + "/session_0/TestDB" + i;
202       removePaths.add(path);
203       boolean success = accessor.remove(path, AccessOption.PERSISTENT);
204       Assert.assertTrue(success, "Should succeed in remove: " + path);
205     }
206     Thread.sleep(500);
207 
208     // verify cache
209     // TestHelper.printCache(accessor._zkCache._cache);
210     ret =
211         TestHelper.verifyZkCache(cachePaths, accessor._zkCache._cache, _gZkClient, true);
212     // System.out.println("ret: " + ret);
213     Assert.assertTrue(ret, "zkCache doesn't match data on Zk");
214     System.out.println("deleteCnt: " + listener._deletePathQueue.size());
215     Assert.assertTrue(listener._deletePathQueue.size() >= 10,
216                         "Shall get at least 10 onDelete callbacks.");
217 
218     // verify each callback path
219     Set<String> removeCallbackPaths = new TreeSet<String>(listener._deletePathQueue);
220     Assert.assertEquals(removeCallbackPaths,
221                         removePaths,
222                         "Should get remove callbacks at " + removePaths + ", but was "
223                             + removeCallbackPaths);
224 
225     System.out.println("END " + clusterName + " at "
226         + new Date(System.currentTimeMillis()));
227   }
228 }