View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.List;
25  
26  import org.I0Itec.zkclient.DataUpdater;
27  import org.apache.helix.InstanceType;
28  import org.apache.helix.PropertyPathConfig;
29  import org.apache.helix.PropertyType;
30  import org.apache.helix.ZNRecord;
31  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
32  import org.apache.log4j.Logger;
33  import org.apache.zookeeper.CreateMode;
34  import org.apache.zookeeper.data.Stat;
35  
36  
37  public final class ZKUtil
38  {
39    private static Logger logger = Logger.getLogger(ZKUtil.class);
40    private static int RETRYLIMIT = 3;
41  
42    private ZKUtil()
43    {
44    }
45  
46    public static boolean isClusterSetup(String clusterName, ZkClient zkClient)
47    {
48      if (clusterName == null || zkClient == null)
49      {
50        return false;
51      }
52      ArrayList<String> requiredPaths = new ArrayList<String>();
53      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName));
54      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
55                                                   clusterName,
56                                                   ConfigScopeProperty.CLUSTER.toString(),
57                                                   clusterName));
58      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
59                                                   clusterName, 
60                                                   ConfigScopeProperty.PARTICIPANT.toString()));
61      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
62                                                   clusterName, 
63                                                   ConfigScopeProperty.RESOURCE.toString()));
64      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE,clusterName));
65      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.LIVEINSTANCES,clusterName));
66      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.INSTANCES,clusterName));
67      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW,clusterName));
68      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONTROLLER, clusterName));
69      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,clusterName));
70      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName));
71      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName));
72      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName));
73      requiredPaths.add(PropertyPathConfig.getPath(PropertyType.HISTORY,clusterName));
74      boolean isValid =true;
75      
76      for(String path:requiredPaths){
77        if(!zkClient.exists(path)){
78          isValid = false;
79          logger.info("Invalid cluster setup, missing znode path: " + path);
80        }
81      }
82      return isValid;
83    }
84  
85      public static boolean isInstanceSetup(ZkClient zkclient, String clusterName, String instanceName, InstanceType type)
86      {
87          if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT)
88          {
89              ArrayList<String> requiredPaths = new ArrayList<String>();
90              requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CONFIGS,
91                                                           clusterName,
92                                                           ConfigScopeProperty.PARTICIPANT.toString(),
93                                                           instanceName));
94              requiredPaths.add(PropertyPathConfig.getPath(PropertyType.MESSAGES,
95                                                           clusterName,
96                                                           instanceName));
97              requiredPaths.add(PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
98                                                           clusterName,
99                                                           instanceName));
100             requiredPaths.add(PropertyPathConfig.getPath(PropertyType.STATUSUPDATES,
101                                                          clusterName,
102                                                          instanceName));
103             requiredPaths.add(PropertyPathConfig.getPath(PropertyType.ERRORS,
104                                                          clusterName,
105                                                          instanceName));
106             boolean isValid =true;
107 
108             for(String path:requiredPaths){
109                 if(!zkclient.exists(path)){
110                     isValid =false;
111                     logger.info("Invalid instance setup, missing znode path: " + path);
112                 }
113             }
114             return isValid;
115         }
116 
117         return true;
118     }
119 
120   public static void createChildren(ZkClient client,
121                                     String parentPath,
122                                     List<ZNRecord> list)
123   {
124     client.createPersistent(parentPath, true);
125     if (list != null)
126     {
127       for (ZNRecord record : list)
128       {
129         createChildren(client, parentPath, record);
130       }
131     }
132   }
133 
134   public static void createChildren(ZkClient client,
135                                     String parentPath,
136                                     ZNRecord nodeRecord)
137   {
138     client.createPersistent(parentPath, true);
139 
140     String id = nodeRecord.getId();
141     String temp = parentPath + "/" + id;
142     client.createPersistent(temp, nodeRecord);
143   }
144 
145   public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list)
146   {
147     // TODO: check if parentPath exists
148     if (list != null)
149     {
150       for (ZNRecord record : list)
151       {
152         dropChildren(client, parentPath, record);
153       }
154     }
155   }
156 
157   public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord)
158   {
159     // TODO: check if parentPath exists
160     String id = nodeRecord.getId();
161     String temp = parentPath + "/" + id;
162     client.deleteRecursive(temp);
163   }
164 
165   public static List<ZNRecord> getChildren(ZkClient client, String path)
166   {
167     // parent watch will be set by zkClient
168     List<String> children = client.getChildren(path);
169     if (children == null || children.size() == 0)
170     {
171       return Collections.emptyList();
172     }
173 
174     List<ZNRecord> childRecords = new ArrayList<ZNRecord>();
175     for (String child : children)
176     {
177       String childPath = path + "/" + child;
178       Stat newStat = new Stat();
179       ZNRecord record = client.readDataAndStat(childPath, newStat, true);
180       if (record != null)
181       {
182         record.setVersion(newStat.getVersion());
183         record.setCreationTime(newStat.getCtime());
184         record.setModifiedTime(newStat.getMtime());
185         childRecords.add(record);
186       }
187     }
188     return childRecords;
189   }
190 
191   public static void updateIfExists(ZkClient client,
192                                     String path,
193                                     final ZNRecord record,
194                                     boolean mergeOnUpdate)
195   {
196     if (client.exists(path))
197     {
198       DataUpdater<Object> updater = new DataUpdater<Object>()
199       {
200         @Override
201         public Object update(Object currentData)
202         {
203           return record;
204         }
205       };
206       client.updateDataSerialized(path, updater);
207     }
208   }
209 
210   public static void createOrUpdate(ZkClient client,
211                                     String path,
212                                     final ZNRecord record,
213                                     final boolean persistent,
214                                     final boolean mergeOnUpdate)
215   {
216     int retryCount = 0;
217     while (retryCount < RETRYLIMIT)
218     {
219       try
220       {
221         if (client.exists(path))
222         {
223           DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
224           {
225             @Override
226             public ZNRecord update(ZNRecord currentData)
227             {
228               if (currentData != null && mergeOnUpdate)
229               {
230                 currentData.merge(record);
231                 return currentData;
232               }
233               return record;
234             }
235           };
236           client.updateDataSerialized(path, updater);
237         }
238         else
239         {
240           CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
241           if (record.getDeltaList().size() > 0)
242           {
243             ZNRecord value = new ZNRecord(record.getId());
244             value.merge(record);
245             client.create(path, value, mode);
246           }
247           else
248           {
249             client.create(path, record, mode);
250           }
251         }
252         break;
253       }
254       catch (Exception e)
255       {
256         retryCount = retryCount + 1;
257         logger.warn("Exception trying to update " + path + " Exception:" + e.getMessage()
258             + ". Will retry.");
259       }
260     }
261   }
262 
263   public static void asyncCreateOrUpdate(ZkClient client,
264                                          String path,
265                                          final ZNRecord record,
266                                          final boolean persistent,
267                                          final boolean mergeOnUpdate)
268   {
269     try
270     {
271       if (client.exists(path))
272       {
273         if (mergeOnUpdate)
274         {
275           ZNRecord curRecord = client.readData(path);
276           if (curRecord != null)
277           {
278             curRecord.merge(record);
279             client.asyncSetData(path, curRecord, -1, null);
280           }
281           else
282           {
283             client.asyncSetData(path, record, -1, null);
284           }
285         }
286         else
287         {
288           client.asyncSetData(path, record, -1, null);
289         }
290       }
291       else
292       {
293         CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
294         if (record.getDeltaList().size() > 0)
295         {
296           ZNRecord newRecord = new ZNRecord(record.getId());
297           newRecord.merge(record);
298           client.create(path, null, mode);
299 
300           client.asyncSetData(path, newRecord, -1, null);
301         }
302         else
303         {
304           client.create(path, null, mode);
305 
306           client.asyncSetData(path, record, -1, null);
307         }
308       }
309     }
310     catch (Exception e)
311     {
312       logger.error("Exception in async create or update " + path + ". Exception: "
313           + e.getMessage() + ". Give up.");
314     }
315   }
316 
317   public static void createOrReplace(ZkClient client,
318                                      String path,
319                                      final ZNRecord record,
320                                      final boolean persistent)
321   {
322     int retryCount = 0;
323     while (retryCount < RETRYLIMIT)
324     {
325       try
326       {
327         if (client.exists(path))
328         {
329           DataUpdater<Object> updater = new DataUpdater<Object>()
330           {
331             @Override
332             public Object update(Object currentData)
333             {
334               return record;
335             }
336           };
337           client.updateDataSerialized(path, updater);
338         }
339         else
340         {
341           CreateMode mode = (persistent) ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
342           client.create(path, record, mode);
343         }
344         break;
345       }
346       catch (Exception e)
347       {
348         retryCount = retryCount + 1;
349         logger.warn("Exception trying to createOrReplace " + path + " Exception:"
350             + e.getMessage() + ". Will retry.");
351       }
352     }
353   }
354 
355   public static void subtract(ZkClient client,
356                               String path,
357                               final ZNRecord recordTosubtract)
358   {
359     int retryCount = 0;
360     while (retryCount < RETRYLIMIT)
361     {
362       try
363       {
364         if (client.exists(path))
365         {
366           DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>()
367           {
368             @Override
369             public ZNRecord update(ZNRecord currentData)
370             {
371               currentData.subtract(recordTosubtract);
372               return currentData;
373             }
374           };
375           client.updateDataSerialized(path, updater);
376           break;
377         }
378       }
379       catch (Exception e)
380       {
381         retryCount = retryCount + 1;
382         logger.warn("Exception trying to createOrReplace " + path + " Exception:"
383             + e.getMessage() + ". Will retry.");
384         e.printStackTrace();
385       }
386     }
387 
388   }
389 }