View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  
27  import org.I0Itec.zkclient.DataUpdater;
28  import org.I0Itec.zkclient.exception.ZkNoNodeException;
29  import org.apache.helix.AccessOption;
30  import org.apache.helix.BaseDataAccessor;
31  import org.apache.helix.ControllerChangeListener;
32  import org.apache.helix.GroupCommit;
33  import org.apache.helix.HelixDataAccessor;
34  import org.apache.helix.HelixException;
35  import org.apache.helix.HelixProperty;
36  import org.apache.helix.InstanceType;
37  import org.apache.helix.NotificationContext;
38  import org.apache.helix.PropertyKey;
39  import org.apache.helix.PropertyKey.Builder;
40  import org.apache.helix.PropertyType;
41  import org.apache.helix.ZNRecord;
42  import org.apache.helix.ZNRecordAssembler;
43  import org.apache.helix.ZNRecordBucketizer;
44  import org.apache.helix.ZNRecordUpdater;
45  import org.apache.helix.controller.restlet.ZNRecordUpdate;
46  import org.apache.helix.controller.restlet.ZNRecordUpdate.OpCode;
47  import org.apache.helix.controller.restlet.ZkPropertyTransferClient;
48  import org.apache.helix.model.LiveInstance;
49  import org.apache.log4j.Logger;
50  import org.apache.zookeeper.data.Stat;
51  
52  
53  public class ZKHelixDataAccessor implements HelixDataAccessor, ControllerChangeListener
54  {
55    private static Logger                    LOG                       =
56                                                                           Logger.getLogger(ZKHelixDataAccessor.class);
57    private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
58    final InstanceType                       _instanceType;
59    private final String                     _clusterName;
60    private final Builder                    _propertyKeyBuilder;
61    ZkPropertyTransferClient                 _zkPropertyTransferClient = null;
62    private final GroupCommit                _groupCommit              = new GroupCommit();
63    String                                   _zkPropertyTransferSvcUrl = null;
64  
65    public ZKHelixDataAccessor(String clusterName,
66                               BaseDataAccessor<ZNRecord> baseDataAccessor)
67    {
68      this(clusterName, null, baseDataAccessor);
69    }
70  
71    public ZKHelixDataAccessor(String clusterName,
72                               InstanceType instanceType,
73                               BaseDataAccessor<ZNRecord> baseDataAccessor)
74    {
75      _clusterName = clusterName;
76      _instanceType = instanceType;
77      _baseDataAccessor = baseDataAccessor;
78      _propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
79    }
80  
81    @Override
82    public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value)
83    {
84      PropertyType type = key.getType();
85      String path = key.getPath();
86      int options = constructOptions(type);
87      return _baseDataAccessor.create(path, value.getRecord(), options);
88    }
89  
90    @Override
91    public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value)
92    {
93      PropertyType type = key.getType();
94      if (!value.isValid())
95      {
96        throw new HelixException("The ZNRecord for " + type + " is not valid.");
97      }
98  
99      String path = key.getPath();
100     int options = constructOptions(type);
101 
102     if (type.usePropertyTransferServer())
103     {
104       if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
105       {
106         ZNRecordUpdate update = new ZNRecordUpdate(path, OpCode.SET, value.getRecord());
107         _zkPropertyTransferClient.enqueueZNRecordUpdate(update, _zkPropertyTransferSvcUrl);
108         return true;
109       }
110     }
111 
112     boolean success = false;
113     switch (type)
114     {
115     case IDEALSTATES:
116     case EXTERNALVIEW:
117       // check if bucketized
118       if (value.getBucketSize() > 0)
119       {
120         // set parent node
121         ZNRecord metaRecord = new ZNRecord(value.getId());
122         metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
123         success = _baseDataAccessor.set(path, metaRecord, options);
124         if (success)
125         {
126           ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
127 
128           Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
129           List<String> paths = new ArrayList<String>();
130           List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
131           for (String bucketName : map.keySet())
132           {
133             paths.add(path + "/" + bucketName);
134             bucketizedRecords.add(map.get(bucketName));
135           }
136 
137           // TODO: set success accordingly
138           _baseDataAccessor.setChildren(paths, bucketizedRecords, options);
139         }
140       }
141       else
142       {
143         success = _baseDataAccessor.set(path, value.getRecord(), options);
144       }
145       break;
146     default:
147       success = _baseDataAccessor.set(path, value.getRecord(), options);
148       break;
149     }
150     return success;
151   }
152 
153   @Override
154   public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value)
155   {
156     PropertyType type = key.getType();
157     String path = key.getPath();
158     int options = constructOptions(type);
159 
160     boolean success = false;
161     switch (type)
162     {
163     case CURRENTSTATES:
164       success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord());
165       break;
166     default:
167       if (type.usePropertyTransferServer())
168       {
169         if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferClient != null)
170         {
171           ZNRecordUpdate update =
172               new ZNRecordUpdate(path, OpCode.UPDATE, value.getRecord());
173           _zkPropertyTransferClient.enqueueZNRecordUpdate(update,
174                                                           _zkPropertyTransferSvcUrl);
175 
176           return true;
177         }
178         else
179         {
180           if(LOG.isTraceEnabled()){
181             LOG.trace("getPropertyTransferUrl is null, skip updating the value");
182           }
183           return true;
184         }
185       }
186       success =
187           _baseDataAccessor.update(path, new ZNRecordUpdater(value.getRecord()), options);
188       break;
189     }
190     return success;
191   }
192 
193   @Override
194   public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys)
195   {
196     if (keys == null || keys.size() == 0)
197     {
198       return Collections.emptyList();
199     }
200 
201     List<T> childValues = new ArrayList<T>();
202 
203     // read all records
204     List<String> paths = new ArrayList<String>();
205     for (PropertyKey key : keys)
206     {
207       paths.add(key.getPath());
208     }
209     List<ZNRecord> children = _baseDataAccessor.get(paths, null, 0);
210 
211     // check if bucketized
212     for (int i = 0; i < keys.size(); i++)
213     {
214       PropertyKey key = keys.get(i);
215       ZNRecord record = children.get(i);
216 
217       PropertyType type = key.getType();
218       String path = key.getPath();
219       int options = constructOptions(type);
220       // ZNRecord record = null;
221 
222       switch (type)
223       {
224       case CURRENTSTATES:
225       case IDEALSTATES:
226       case EXTERNALVIEW:
227         // check if bucketized
228         if (record != null)
229         {
230           HelixProperty property = new HelixProperty(record);
231 
232           int bucketSize = property.getBucketSize();
233           if (bucketSize > 0)
234           {
235             List<ZNRecord> childRecords =
236                 _baseDataAccessor.getChildren(path, null, options);
237             ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
238 
239             // merge with parent node value
240             if (assembledRecord != null)
241             {
242               record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
243               record.getListFields().putAll(assembledRecord.getListFields());
244               record.getMapFields().putAll(assembledRecord.getMapFields());
245             }
246           }
247         }
248         break;
249       default:
250         break;
251       }
252 
253       @SuppressWarnings("unchecked")
254       T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
255       childValues.add(t);
256     }
257 
258     return childValues;
259   }
260 
261   @Override
262   public <T extends HelixProperty> T getProperty(PropertyKey key)
263   {
264     PropertyType type = key.getType();
265     String path = key.getPath();
266     int options = constructOptions(type);
267     ZNRecord record = null;
268     try
269     {
270       Stat stat = new Stat();
271       record = _baseDataAccessor.get(path, stat, options);
272       if (record != null)
273       {
274         record.setCreationTime(stat.getCtime());
275         record.setModifiedTime(stat.getMtime());
276       }
277     }
278     catch (ZkNoNodeException e)
279     {
280       // OK
281     }
282 
283     switch (type)
284     {
285     case CURRENTSTATES:
286     case IDEALSTATES:
287     case EXTERNALVIEW:
288       // check if bucketized
289       if (record != null)
290       {
291         HelixProperty property = new HelixProperty(record);
292 
293         int bucketSize = property.getBucketSize();
294         if (bucketSize > 0)
295         {
296           List<ZNRecord> childRecords =
297               _baseDataAccessor.getChildren(path, null, options);
298           ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
299 
300           // merge with parent node value
301           if (assembledRecord != null)
302           {
303             record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
304             record.getListFields().putAll(assembledRecord.getListFields());
305             record.getMapFields().putAll(assembledRecord.getMapFields());
306           }
307         }
308       }
309       break;
310     default:
311       break;
312     }
313 
314     @SuppressWarnings("unchecked")
315     T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
316     return t;
317   }
318 
319   @Override
320   public boolean removeProperty(PropertyKey key)
321   {
322     PropertyType type = key.getType();
323     String path = key.getPath();
324     int options = constructOptions(type);
325 
326     return _baseDataAccessor.remove(path, options);
327   }
328 
329   @Override
330   public List<String> getChildNames(PropertyKey key)
331   {
332     PropertyType type = key.getType();
333     String parentPath = key.getPath();
334     int options = constructOptions(type);
335     List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
336     if (childNames == null)
337     {
338       childNames = Collections.emptyList();
339     }
340     return childNames;
341   }
342 
343   @Override
344   public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
345   {
346     PropertyType type = key.getType();
347     String parentPath = key.getPath();
348     int options = constructOptions(type);
349     List<T> childValues = new ArrayList<T>();
350 
351     List<ZNRecord> children = _baseDataAccessor.getChildren(parentPath, null, options);
352     if (children != null)
353     {
354       for (ZNRecord record : children)
355       {
356         switch (type)
357         {
358         case CURRENTSTATES:
359         case IDEALSTATES:
360         case EXTERNALVIEW:
361           if (record != null)
362           {
363             HelixProperty property = new HelixProperty(record);
364 
365             int bucketSize = property.getBucketSize();
366             if (bucketSize > 0)
367             {
368               // TODO: fix this if record.id != pathName
369               String childPath = parentPath + "/" + record.getId();
370               List<ZNRecord> childRecords =
371                   _baseDataAccessor.getChildren(childPath, null, options);
372               ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
373 
374               // merge with parent node value
375               if (assembledRecord != null)
376               {
377                 record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
378                 record.getListFields().putAll(assembledRecord.getListFields());
379                 record.getMapFields().putAll(assembledRecord.getMapFields());
380               }
381             }
382           }
383 
384           break;
385         default:
386           break;
387         }
388 
389         if (record != null)
390         {
391           @SuppressWarnings("unchecked")
392           T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
393           childValues.add(t);
394         }
395       }
396     }
397     return childValues;
398   }
399 
400   @Override
401   public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key)
402   {
403     PropertyType type = key.getType();
404     String parentPath = key.getPath();
405     int options = constructOptions(type);
406     List<T> children = getChildValues(key);
407     Map<String, T> childValuesMap = new HashMap<String, T>();
408     for (T t : children)
409     {
410       childValuesMap.put(t.getRecord().getId(), t);
411     }
412     return childValuesMap;
413 
414   }
415 
416   @Override
417   public Builder keyBuilder()
418   {
419     return _propertyKeyBuilder;
420   }
421 
422   private int constructOptions(PropertyType type)
423   {
424     int options = 0;
425     if (type.isPersistent())
426     {
427       options = options | AccessOption.PERSISTENT;
428     }
429     else
430     {
431       options = options | AccessOption.EPHEMERAL;
432     }
433 
434     return options;
435   }
436 
437   @Override
438   public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
439                                                             List<T> children)
440   {
441     // TODO: add validation
442     int options = -1;
443     List<String> paths = new ArrayList<String>();
444     List<ZNRecord> records = new ArrayList<ZNRecord>();
445     for (int i = 0; i < keys.size(); i++)
446     {
447       PropertyKey key = keys.get(i);
448       PropertyType type = key.getType();
449       String path = key.getPath();
450       paths.add(path);
451       HelixProperty value = children.get(i);
452       records.add(value.getRecord());
453       options = constructOptions(type);
454     }
455     return _baseDataAccessor.createChildren(paths, records, options);
456   }
457 
458   @Override
459   public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys,
460                                                          List<T> children)
461   {
462     int options = -1;
463     List<String> paths = new ArrayList<String>();
464     List<ZNRecord> records = new ArrayList<ZNRecord>();
465 
466     List<List<String>> bucketizedPaths =
467         new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
468     List<List<ZNRecord>> bucketizedRecords =
469         new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(),
470                                                                            null));
471 
472     for (int i = 0; i < keys.size(); i++)
473     {
474       PropertyKey key = keys.get(i);
475       PropertyType type = key.getType();
476       String path = key.getPath();
477       paths.add(path);
478       options = constructOptions(type);
479 
480       HelixProperty value = children.get(i);
481 
482       switch (type)
483       {
484       case EXTERNALVIEW:
485         if (value.getBucketSize() == 0)
486         {
487           records.add(value.getRecord());
488         }
489         else
490         {
491           _baseDataAccessor.remove(path, options);
492 
493           ZNRecord metaRecord = new ZNRecord(value.getId());
494           metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
495           records.add(metaRecord);
496 
497           ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
498 
499           Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
500           List<String> childBucketizedPaths = new ArrayList<String>();
501           List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
502           for (String bucketName : map.keySet())
503           {
504             childBucketizedPaths.add(path + "/" + bucketName);
505             childBucketizedRecords.add(map.get(bucketName));
506           }
507           bucketizedPaths.set(i, childBucketizedPaths);
508           bucketizedRecords.set(i, childBucketizedRecords);
509         }
510         break;
511       default:
512         records.add(value.getRecord());
513         break;
514       }
515     }
516 
517     // set non-bucketized nodes or parent nodes of bucketized nodes
518     boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
519 
520     // set bucketized nodes
521     List<String> allBucketizedPaths = new ArrayList<String>();
522     List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
523 
524     for (int i = 0; i < keys.size(); i++)
525     {
526       if (success[i] && bucketizedPaths.get(i) != null)
527       {
528         allBucketizedPaths.addAll(bucketizedPaths.get(i));
529         allBucketizedRecords.addAll(bucketizedRecords.get(i));
530       }
531     }
532 
533     // TODO: set success accordingly
534     _baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
535 
536     return success;
537   }
538 
539   @Override
540   public BaseDataAccessor<ZNRecord> getBaseDataAccessor()
541   {
542     return _baseDataAccessor;
543   }
544 
545   @Override
546   public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
547                                                             List<DataUpdater<ZNRecord>> updaters,
548                                                             int options)
549   {
550     return _baseDataAccessor.updateChildren(paths, updaters, options);
551   }
552 
553   public void shutdown()
554   {
555     if (_zkPropertyTransferClient != null)
556     {
557       _zkPropertyTransferClient.shutdown();
558     }
559   }
560 
561   @Override
562   public void onControllerChange(NotificationContext changeContext)
563   {
564     LOG.info("Controller has changed");
565     refreshZkPropertyTransferUrl();
566     if (_zkPropertyTransferClient == null)
567     {
568       if (_zkPropertyTransferSvcUrl != null && _zkPropertyTransferSvcUrl.length() > 0)
569       {
570         LOG.info("Creating ZkPropertyTransferClient as we get url "
571             + _zkPropertyTransferSvcUrl);
572         _zkPropertyTransferClient =
573             new ZkPropertyTransferClient(ZkPropertyTransferClient.DEFAULT_MAX_CONCURRENTTASKS);
574       }
575     }
576   }
577 
578   void refreshZkPropertyTransferUrl()
579   {
580     try
581     {
582       LiveInstance leader = getProperty(keyBuilder().controllerLeader());
583       if (leader != null)
584       {
585         _zkPropertyTransferSvcUrl = leader.getWebserviceUrl();
586         LOG.info("_zkPropertyTransferSvcUrl : " + _zkPropertyTransferSvcUrl
587             + " Controller " + leader.getInstanceName());
588       }
589       else
590       {
591         _zkPropertyTransferSvcUrl = null;
592       }
593     }
594     catch (Exception e)
595     {
596       // LOG.error("", e);
597       _zkPropertyTransferSvcUrl = null;
598     }
599   }
600 }