View Javadoc

1   package org.apache.helix.store.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Date;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  
30  import org.I0Itec.zkclient.exception.ZkNoNodeException;
31  import org.apache.helix.AccessOption;
32  import org.apache.helix.ZNRecord;
33  import org.apache.helix.ZkUnitTestBase;
34  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
35  import org.apache.helix.manager.zk.ZkClient;
36  import org.apache.helix.store.HelixPropertyListener;
37  import org.apache.helix.store.zk.ZkHelixPropertyStore;
38  import org.testng.Assert;
39  import org.testng.annotations.Test;
40  
41  
42  public class TestZkHelixPropertyStore extends ZkUnitTestBase
43  {
44    final String _root         = "/" + getShortClassName();
45    final int    bufSize       = 128;
46    final int    mapNr         = 10;
47    final int    firstLevelNr  = 10;
48    final int    secondLevelNr = 10;
49  
50    // final int totalNodes = firstLevelNr * secondLevelNr;
51  
52    class TestListener implements HelixPropertyListener
53    {
54      Map<String, Long> _changeKeys = new HashMap<String, Long>();
55      Map<String, Long> _createKeys = new HashMap<String, Long>();
56      Map<String, Long> _deleteKeys = new HashMap<String, Long>();
57  
58      public void reset()
59      {
60        _changeKeys.clear();
61        _createKeys.clear();
62        _deleteKeys.clear();
63      }
64  
65      @Override
66      public void onDataChange(String path)
67      {
68        _changeKeys.put(path, new Long(System.currentTimeMillis()));
69      }
70  
71      @Override
72      public void onDataCreate(String path)
73      {
74        _createKeys.put(path, new Long(System.currentTimeMillis()));
75      }
76  
77      @Override
78      public void onDataDelete(String path)
79      {
80        _deleteKeys.put(path, new Long(System.currentTimeMillis()));
81      }
82    }
83  
84    @Test
85    public void testSet()
86    {
87      // Logger.getRootLogger().setLevel(Level.INFO);
88  
89      System.out.println("START testSet() at " + new Date(System.currentTimeMillis()));
90  
91      String subRoot = _root + "/" + "set";
92      List<String> subscribedPaths = new ArrayList<String>();
93      subscribedPaths.add(subRoot);
94      ZkHelixPropertyStore<ZNRecord> store =
95          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
96                                             subRoot,
97                                             subscribedPaths);
98  
99      // test set
100     setNodes(store, 'a', false);
101     for (int i = 0; i < 10; i++)
102     {
103       for (int j = 0; j < 10; j++)
104       {
105         String nodeId = getNodeId(i, j);
106         String key = getSecondLevelKey(i, j);
107         ZNRecord record = store.get(key, null, 0);
108         Assert.assertEquals(record.getId(), nodeId);
109       }
110     }
111 
112     // test get from cache
113     long startT = System.currentTimeMillis();
114     for (int i = 0; i < 1000; i++)
115     {
116       ZNRecord record = store.get("/node_0/childNode_0_0", null, 0);
117       Assert.assertNotNull(record);
118     }
119     long endT = System.currentTimeMillis();
120     System.out.println("1000 Get() time used: " + (endT - startT) + "ms");
121     long latency = endT - startT;
122     Assert.assertTrue(latency < 200, "1000 Gets should be finished within 200ms, but was " + latency + " ms");
123 
124     store.stop();
125     System.out.println("END testSet() at " + new Date(System.currentTimeMillis()));
126   }
127   
128   @Test
129   public void testSetInvalidPath()
130   {
131 	    String subRoot = _root + "/" + "setInvalidPath";
132 	    ZkHelixPropertyStore<ZNRecord> store =
133 	        new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
134 	                                           subRoot,
135 	                                           null);
136 	    try {
137 	    	store.set("abc/xyz", new ZNRecord("testInvalid"), AccessOption.PERSISTENT);
138 	    	Assert.fail("Should throw illegal-argument-exception since path doesn't start with /");
139 	    } catch (IllegalArgumentException e) {
140 	    	// e.printStackTrace();
141 	    	// OK
142 	    } catch (Exception e) {
143 	    	Assert.fail("Should not throw exceptions other than illegal-argument");
144 	    }
145   }
146 
147   @Test
148   public void testLocalTriggeredCallback() throws Exception
149   {
150     System.out.println("START testLocalTriggeredCallback() at "
151         + new Date(System.currentTimeMillis()));
152 
153     String subRoot = _root + "/" + "localCallback";
154     List<String> subscribedPaths = new ArrayList<String>();
155     subscribedPaths.add(subRoot);
156     ZkHelixPropertyStore<ZNRecord> store =
157         new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
158                                            subRoot,
159                                            subscribedPaths);
160 
161     // change nodes via property store interface
162     // and verify all notifications have been received
163     TestListener listener = new TestListener();
164     store.subscribe("/", listener);
165 
166     // test dataCreate callbacks
167     listener.reset();
168     setNodes(store, 'a', true);
169 
170     // wait until all callbacks have been received
171     Thread.sleep(500);
172     int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
173     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
174         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
175     Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
176         + expectCreateNodes + " create callbacks");
177 
178     // test dataChange callbacks
179     listener.reset();
180     setNodes(store, 'b', true);
181 
182     // wait until all callbacks have been received
183     Thread.sleep(500);
184     int expectChangeNodes = firstLevelNr * secondLevelNr;
185     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
186         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
187     Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
188                       "Should receive at least " + expectChangeNodes
189                           + " change callbacks");
190 
191     // test delete callbacks
192     listener.reset();
193     int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
194     store.remove("/", 0);
195     // wait until all callbacks have been received
196     for (int i = 0; i < 10; i++) {
197     	if (listener._deleteKeys.size() == expectDeleteNodes)
198     		break;
199     	Thread.sleep(500);
200     }
201 
202     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
203         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
204     Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
205         + expectDeleteNodes + " delete callbacks");
206 
207     store.stop();
208     System.out.println("END testLocalTriggeredCallback() at "
209         + new Date(System.currentTimeMillis()));
210   }
211 
212   @Test
213   public void testZkTriggeredCallback() throws Exception
214   {
215     System.out.println("START testZkTriggeredCallback() at "
216         + new Date(System.currentTimeMillis()));
217 
218     String subRoot = _root + "/" + "zkCallback";
219     List<String> subscribedPaths = Arrays.asList(subRoot);
220     ZkHelixPropertyStore<ZNRecord> store =
221         new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
222                                            subRoot,
223                                            subscribedPaths);
224 
225     // change nodes via property store interface
226     // and verify all notifications have been received
227     TestListener listener = new TestListener();
228     store.subscribe("/", listener);
229 
230     // test create callbacks
231     listener.reset();
232     setNodes(_gZkClient, subRoot, 'a', true);
233     int expectCreateNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
234     Thread.sleep(500);
235 
236     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
237         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
238     Assert.assertTrue(listener._createKeys.size() == expectCreateNodes, "Should receive "
239         + expectCreateNodes + " create callbacks");
240 
241     // test change callbacks
242     listener.reset();
243     setNodes(_gZkClient, subRoot, 'b', true);
244     int expectChangeNodes = firstLevelNr * secondLevelNr;
245     for (int i = 0; i < 10; i++) {
246     	if (listener._changeKeys.size() >= expectChangeNodes)
247     		break;
248     	Thread.sleep(500);
249     }
250 
251     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
252         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
253     Assert.assertTrue(listener._changeKeys.size() >= expectChangeNodes,
254                       "Should receive at least " + expectChangeNodes
255                           + " change callbacks");
256 
257     // test delete callbacks
258     listener.reset();
259     int expectDeleteNodes = 1 + firstLevelNr + firstLevelNr * secondLevelNr;
260     _gZkClient.deleteRecursive(subRoot);
261     Thread.sleep(1000);
262 
263     System.out.println("createKey#:" + listener._createKeys.size() + ", changeKey#:"
264         + listener._changeKeys.size() + ", deleteKey#:" + listener._deleteKeys.size());
265     Assert.assertTrue(listener._deleteKeys.size() == expectDeleteNodes, "Should receive "
266         + expectDeleteNodes + " delete callbacks");
267 
268     store.stop();
269     System.out.println("END testZkTriggeredCallback() at "
270         + new Date(System.currentTimeMillis()));
271   }
272 
273   @Test
274   public void testBackToBackRemoveAndSet() throws Exception
275   {
276     System.out.println("START testBackToBackRemoveAndSet() at "
277         + new Date(System.currentTimeMillis()));
278 
279     String subRoot = _root + "/" + "backToBackRemoveAndSet";
280     List<String> subscribedPaths = new ArrayList<String>();
281     subscribedPaths.add(subRoot);
282     ZkHelixPropertyStore<ZNRecord> store =
283         new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_gZkClient),
284                                            subRoot,
285                                            subscribedPaths);
286 
287     store.set("/child0", new ZNRecord("child0"), AccessOption.PERSISTENT);
288 
289     ZNRecord record = store.get("/child0", null, 0); // will put the record in cache
290     Assert.assertEquals(record.getId(), "child0");
291     // System.out.println("1:get:" + record);
292 
293     String child0Path = subRoot + "/child0";
294     for (int i = 0; i < 2; i++)
295     {
296       _gZkClient.delete(child0Path);
297       _gZkClient.createPersistent(child0Path, new ZNRecord("child0-new-" + i));
298     }
299 
300     Thread.sleep(500); // should wait for zk callback to add "/child0" into cache
301     record = store.get("/child0", null, 0);
302     Assert.assertEquals(record.getId(),
303                         "child0-new-1",
304                         "Cache shoulde be updated to latest create");
305     // System.out.println("2:get:" + record);
306 
307     _gZkClient.delete(child0Path);
308     Thread.sleep(500); // should wait for zk callback to remove "/child0" from cache
309     try
310     {
311       record = store.get("/child0", null, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
312       Assert.fail("/child0 should have been removed");
313     }
314     catch (ZkNoNodeException e)
315     {
316       // OK.
317     }
318     // System.out.println("3:get:" + record);
319 
320     store.stop();
321     System.out.println("END testBackToBackRemoveAndSet() at "
322         + new Date(System.currentTimeMillis()));
323   }
324 
325   private String getNodeId(int i, int j)
326   {
327     return "childNode_" + i + "_" + j;
328   }
329 
330   private String getSecondLevelKey(int i, int j)
331   {
332     return "/node_" + i + "/" + getNodeId(i, j);
333   }
334 
335   private String getFirstLevelKey(int i)
336   {
337     return "/node_" + i;
338   }
339 
340   private void setNodes(ZkHelixPropertyStore<ZNRecord> store,
341                         char c,
342                         boolean needTimestamp)
343   {
344     char[] data = new char[bufSize];
345 
346     for (int i = 0; i < bufSize; i++)
347     {
348       data[i] = c;
349     }
350 
351     Map<String, String> map = new TreeMap<String, String>();
352     for (int i = 0; i < mapNr; i++)
353     {
354       map.put("key_" + i, new String(data));
355     }
356 
357     for (int i = 0; i < firstLevelNr; i++)
358     {
359       for (int j = 0; j < secondLevelNr; j++)
360       {
361         String nodeId = getNodeId(i, j);
362         ZNRecord record = new ZNRecord(nodeId);
363         record.setSimpleFields(map);
364         if (needTimestamp)
365         {
366           long now = System.currentTimeMillis();
367           record.setSimpleField("SetTimestamp", Long.toString(now));
368         }
369         String key = getSecondLevelKey(i, j);
370         store.set(key, record, AccessOption.PERSISTENT);
371       }
372     }
373   }
374 
375   private void setNodes(ZkClient zkClient, String root, char c, boolean needTimestamp)
376   {
377     char[] data = new char[bufSize];
378 
379     for (int i = 0; i < bufSize; i++)
380     {
381       data[i] = c;
382     }
383 
384     Map<String, String> map = new TreeMap<String, String>();
385     for (int i = 0; i < mapNr; i++)
386     {
387       map.put("key_" + i, new String(data));
388     }
389 
390     for (int i = 0; i < firstLevelNr; i++)
391     {
392       String firstLevelKey = getFirstLevelKey(i);
393 
394       for (int j = 0; j < secondLevelNr; j++)
395       {
396         String nodeId = getNodeId(i, j);
397         ZNRecord record = new ZNRecord(nodeId);
398         record.setSimpleFields(map);
399         if (needTimestamp)
400         {
401           long now = System.currentTimeMillis();
402           record.setSimpleField("SetTimestamp", Long.toString(now));
403         }
404         String key = getSecondLevelKey(i, j);
405         try
406         {
407           zkClient.writeData(root + key, record);
408         }
409         catch (ZkNoNodeException e)
410         {
411           zkClient.createPersistent(root + firstLevelKey, true);
412           zkClient.createPersistent(root + key, record);
413         }
414       }
415     }
416   }
417 
418 }