View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.DataInputStream;
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.IOException;
26  import java.net.InetAddress;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashMap;
31  import java.util.HashSet;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.TreeMap;
37  import java.util.UUID;
38  import java.util.concurrent.TimeUnit;
39  
40  import org.I0Itec.zkclient.DataUpdater;
41  import org.I0Itec.zkclient.exception.ZkNoNodeException;
42  import org.apache.helix.AccessOption;
43  import org.apache.helix.ConfigAccessor;
44  import org.apache.helix.HelixAdmin;
45  import org.apache.helix.HelixConstants;
46  import org.apache.helix.HelixDataAccessor;
47  import org.apache.helix.HelixDefinedState;
48  import org.apache.helix.HelixException;
49  import org.apache.helix.InstanceType;
50  import org.apache.helix.PropertyKey;
51  import org.apache.helix.PropertyKey.Builder;
52  import org.apache.helix.PropertyPathConfig;
53  import org.apache.helix.PropertyType;
54  import org.apache.helix.ZNRecord;
55  import org.apache.helix.alerts.AlertsHolder;
56  import org.apache.helix.alerts.StatsHolder;
57  import org.apache.helix.model.Alerts;
58  import org.apache.helix.model.ClusterConstraints;
59  import org.apache.helix.model.ClusterConstraints.ConstraintType;
60  import org.apache.helix.model.ConfigScope;
61  import org.apache.helix.model.ConstraintItem;
62  import org.apache.helix.model.CurrentState;
63  import org.apache.helix.model.ExternalView;
64  import org.apache.helix.model.HelixConfigScope;
65  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
66  import org.apache.helix.model.IdealState;
67  import org.apache.helix.model.IdealState.IdealStateModeProperty;
68  import org.apache.helix.model.InstanceConfig;
69  import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
70  import org.apache.helix.model.LiveInstance;
71  import org.apache.helix.model.Message;
72  import org.apache.helix.model.Message.MessageState;
73  import org.apache.helix.model.Message.MessageType;
74  import org.apache.helix.model.PauseSignal;
75  import org.apache.helix.model.PersistentStats;
76  import org.apache.helix.model.StateModelDefinition;
77  import org.apache.helix.tools.DefaultIdealStateCalculator;
78  import org.apache.helix.util.HelixUtil;
79  import org.apache.helix.util.RebalanceUtil;
80  import org.apache.log4j.Logger;
81  
82  
83  public class ZKHelixAdmin implements HelixAdmin
84  {
85  
86    private final ZkClient _zkClient;
87    private final ConfigAccessor _configAccessor;
88  
89    private static Logger logger = Logger.getLogger(ZKHelixAdmin.class);
90  
91    public ZKHelixAdmin(ZkClient zkClient)
92    {
93      _zkClient = zkClient;
94      _configAccessor = new ConfigAccessor(zkClient);
95    }
96  
97    public ZKHelixAdmin(String zkAddress)
98    {
99      _zkClient = new ZkClient(zkAddress);
100     _zkClient.setZkSerializer(new ZNRecordSerializer());
101     _zkClient.waitUntilConnected(30, TimeUnit.SECONDS);
102     _configAccessor = new ConfigAccessor(_zkClient);
103   }
104   
105   @Override
106   public void addInstance(String clusterName, InstanceConfig instanceConfig)
107   {
108     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
109     {
110       throw new HelixException("cluster " + clusterName + " is not setup yet");
111     }
112     String instanceConfigsPath =
113         PropertyPathConfig.getPath(PropertyType.CONFIGS,
114                                    clusterName,
115                                    ConfigScopeProperty.PARTICIPANT.toString());
116     String nodeId = instanceConfig.getId();
117     String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
118 
119     if (_zkClient.exists(instanceConfigPath))
120     {
121       throw new HelixException("Node " + nodeId + " already exists in cluster "
122           + clusterName);
123     }
124 
125     ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
126 
127     _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true);
128     _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId),
129                                true);
130     _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true);
131     _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true);
132   }
133 
134   @Override
135   public void dropInstance(String clusterName, InstanceConfig instanceConfig)
136   {
137     // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
138     String instanceConfigsPath =
139         PropertyPathConfig.getPath(PropertyType.CONFIGS,
140                                    clusterName,
141                                    ConfigScopeProperty.PARTICIPANT.toString());
142     String nodeId = instanceConfig.getId();
143     String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
144     String instancePath = HelixUtil.getInstancePath(clusterName, nodeId);
145 
146     if (!_zkClient.exists(instanceConfigPath))
147     {
148       throw new HelixException("Node " + nodeId
149           + " does not exist in config for cluster " + clusterName);
150     }
151 
152     if (!_zkClient.exists(instancePath))
153     {
154       throw new HelixException("Node " + nodeId
155           + " does not exist in instances for cluster " + clusterName);
156     }
157 
158     // delete config path
159     ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
160 
161     // delete instance path
162     _zkClient.deleteRecursive(instancePath);
163   }
164 
165   @Override
166   public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
167   {
168     // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
169 
170     // String instanceConfigPath = instanceConfigsPath + "/" + instanceName;
171     String instanceConfigPath =
172         PropertyPathConfig.getPath(PropertyType.CONFIGS,
173                                    clusterName,
174                                    ConfigScopeProperty.PARTICIPANT.toString(),
175                                    instanceName);
176     if (!_zkClient.exists(instanceConfigPath))
177     {
178       throw new HelixException("instance" + instanceName + " does not exist in cluster "
179           + clusterName);
180     }
181 
182     ZKHelixDataAccessor accessor =
183         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
184     Builder keyBuilder = accessor.keyBuilder();
185 
186     return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
187   }
188 
189   @Override
190   public void enableInstance(final String clusterName,
191                              final String instanceName,
192                              final boolean enabled)
193   {
194     String path =
195         PropertyPathConfig.getPath(PropertyType.CONFIGS,
196                                    clusterName,
197                                    ConfigScopeProperty.PARTICIPANT.toString(),
198                                    instanceName);
199 
200     ZkBaseDataAccessor<ZNRecord> baseAccessor =
201         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
202     if (!baseAccessor.exists(path, 0))
203     {
204       throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
205           + ", instance config does not exist");
206     }
207 
208     baseAccessor.update(path, new DataUpdater<ZNRecord>()
209     {
210       @Override
211       public ZNRecord update(ZNRecord currentData)
212       {
213         if (currentData == null)
214         {
215           throw new HelixException("Cluster: " + clusterName + ", instance: "
216               + instanceName + ", participant config is null");
217         }
218 
219         InstanceConfig config = new InstanceConfig(currentData);
220         config.setInstanceEnabled(enabled);
221         return config.getRecord();
222       }
223     }, AccessOption.PERSISTENT);
224   }
225 
226   @Override
227   public void enablePartition(final boolean enabled,
228                               final String clusterName,
229                               final String instanceName,
230                               final String resourceName,
231                               final List<String> partitionNames)
232   {
233     String path =
234         PropertyPathConfig.getPath(PropertyType.CONFIGS,
235                                    clusterName,
236                                    ConfigScopeProperty.PARTICIPANT.toString(),
237                                    instanceName);
238 
239     ZkBaseDataAccessor<ZNRecord> baseAccessor =
240         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
241 
242     // check instanceConfig exists
243     if (!baseAccessor.exists(path, 0))
244     {
245       throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
246           + ", instance config does not exist");
247     }
248 
249     // check resource exists
250     String idealStatePath =
251         PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
252 
253     ZNRecord idealStateRecord = null;
254     try
255     {
256       idealStateRecord = baseAccessor.get(idealStatePath, null, 0);
257     }
258     catch (ZkNoNodeException e)
259     {
260       // OK.
261     }
262 
263     // check resource exist. warn if not.
264     if (idealStateRecord == null)
265     {
266 //      throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName
267 //          + ", ideal state does not exist");
268       logger.warn("Disable partitions: " + partitionNames + " but Cluster: " + clusterName 
269           + ", resource: " + resourceName + " does not exists. probably disable it during ERROR->DROPPED transtition");
270 
271     } else {
272       // check partitions exist. warn if not
273       IdealState idealState = new IdealState(idealStateRecord);
274       for (String partitionName : partitionNames)
275       {
276         if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null)
277             || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null))
278         {
279           logger.warn("Cluster: " + clusterName + ", resource: " + resourceName
280               + ", partition: " + partitionName
281               + ", partition does not exist in ideal state");
282         }
283       }
284     }
285 
286     // update participantConfig
287     // could not use ZNRecordUpdater since it doesn't do listField merge/subtract
288     baseAccessor.update(path, new DataUpdater<ZNRecord>()
289     {
290       @Override
291       public ZNRecord update(ZNRecord currentData)
292       {
293         if (currentData == null)
294         {
295           throw new HelixException("Cluster: " + clusterName + ", instance: "
296               + instanceName + ", participant config is null");
297         }
298 
299         // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
300         List<String> list =
301             currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
302         Set<String> disabledPartitions = new HashSet<String>();
303         if (list != null)
304         {
305           disabledPartitions.addAll(list);
306         }
307 
308         if (enabled)
309         {
310           disabledPartitions.removeAll(partitionNames);
311         }
312         else
313         {
314           disabledPartitions.addAll(partitionNames);
315         }
316 
317         list = new ArrayList<String>(disabledPartitions);
318         Collections.sort(list);
319         currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
320                                  list);
321         return currentData;
322       }
323     },
324                         AccessOption.PERSISTENT);
325   }
326 
327   @Override
328   public void enableCluster(String clusterName, boolean enabled)
329   {
330     HelixDataAccessor accessor =
331         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
332     Builder keyBuilder = accessor.keyBuilder();
333 
334     if (enabled)
335     {
336       accessor.removeProperty(keyBuilder.pause());
337     }
338     else
339     {
340       accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause"));
341     }
342   }
343 
344   @Override
345   public void resetPartition(String clusterName,
346                              String instanceName,
347                              String resourceName,
348                              List<String> partitionNames)
349   {
350     ZKHelixDataAccessor accessor =
351         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
352     Builder keyBuilder = accessor.keyBuilder();
353 
354     // check the instance is alive
355     LiveInstance liveInstance =
356         accessor.getProperty(keyBuilder.liveInstance(instanceName));
357     if (liveInstance == null)
358     {
359       throw new HelixException("Can't reset state for " + resourceName + "/"
360           + partitionNames + " on " + instanceName + ", because " + instanceName
361           + " is not alive");
362     }
363 
364     // check resource group exists
365     IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
366     if (idealState == null)
367     {
368       throw new HelixException("Can't reset state for " + resourceName + "/"
369           + partitionNames + " on " + instanceName + ", because " + resourceName
370           + " is not added");
371     }
372 
373     // check partition exists in resource group
374     Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
375     if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
376     {
377       Set<String> partitions =
378           new HashSet<String>(idealState.getRecord().getMapFields().keySet());
379       if (!partitions.containsAll(resetPartitionNames))
380       {
381         throw new HelixException("Can't reset state for " + resourceName + "/"
382             + partitionNames + " on " + instanceName + ", because not all "
383             + partitionNames + " exist");
384       }
385     }
386     else
387     {
388       Set<String> partitions =
389           new HashSet<String>(idealState.getRecord().getListFields().keySet());
390       if (!partitions.containsAll(resetPartitionNames))
391       {
392         throw new HelixException("Can't reset state for " + resourceName + "/"
393             + partitionNames + " on " + instanceName + ", because not all "
394             + partitionNames + " exist");
395       }
396     }
397 
398     // check partition is in ERROR state
399     String sessionId = liveInstance.getSessionId();
400     CurrentState curState =
401         accessor.getProperty(keyBuilder.currentState(instanceName,
402                                                      sessionId,
403                                                      resourceName));
404     for (String partitionName : resetPartitionNames)
405     {
406       if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString()))
407       {
408         throw new HelixException("Can't reset state for " + resourceName + "/"
409             + partitionNames + " on " + instanceName + ", because not all "
410             + partitionNames + " are in ERROR state");
411       }
412     }
413 
414     // check stateModelDef exists and get initial state
415     String stateModelDef = idealState.getStateModelDefRef();
416     StateModelDefinition stateModel =
417         accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
418     if (stateModel == null)
419     {
420       throw new HelixException("Can't reset state for " + resourceName + "/"
421           + partitionNames + " on " + instanceName + ", because " + stateModelDef
422           + " is NOT found");
423     }
424 
425     // check there is no pending messages for the partitions exist
426     List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
427     for (Message message : messages)
428     {
429       if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
430           || !sessionId.equals(message.getTgtSessionId())
431           || !resourceName.equals(message.getResourceName())
432           || !resetPartitionNames.contains(message.getPartitionName()))
433       {
434         continue;
435       }
436 
437       throw new HelixException("Can't reset state for " + resourceName + "/"
438           + partitionNames + " on " + instanceName
439           + ", because a pending message exists: " + message);
440     }
441 
442     String adminName = null;
443     try
444     {
445       adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
446     }
447     catch (UnknownHostException e)
448     {
449       // can ignore it
450       logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
451       adminName = "UNKNOWN";
452     }
453 
454     List<Message> resetMessages = new ArrayList<Message>();
455     List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
456     for (String partitionName : resetPartitionNames)
457     {
458       // send ERROR to initialState message
459       String msgId = UUID.randomUUID().toString();
460       Message message = new Message(MessageType.STATE_TRANSITION, msgId);
461       message.setSrcName(adminName);
462       message.setTgtName(instanceName);
463       message.setMsgState(MessageState.NEW);
464       message.setPartitionName(partitionName);
465       message.setResourceName(resourceName);
466       message.setTgtSessionId(sessionId);
467       message.setStateModelDef(stateModelDef);
468       message.setFromState(HelixDefinedState.ERROR.toString());
469       message.setToState(stateModel.getInitialState());
470       message.setStateModelFactoryName(idealState.getStateModelFactoryName());
471 
472       resetMessages.add(message);
473       messageKeys.add(keyBuilder.message(instanceName, message.getId()));
474     }
475 
476     accessor.setChildren(messageKeys, resetMessages);
477   }
478 
479   @Override
480   public void resetInstance(String clusterName, List<String> instanceNames)
481   {
482     // TODO: not mp-safe
483     ZKHelixDataAccessor accessor =
484         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
485     Builder keyBuilder = accessor.keyBuilder();
486     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
487 
488     Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
489     for (String instanceName : resetInstanceNames)
490     {
491       List<String> resetPartitionNames = new ArrayList<String>();
492       for (ExternalView extView : extViews)
493       {
494         Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
495         for (String partitionName : stateMap.keySet())
496         {
497           Map<String, String> instanceStateMap = stateMap.get(partitionName);
498 
499           if (instanceStateMap.containsKey(instanceName)
500               && instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()))
501           {
502             resetPartitionNames.add(partitionName);
503           }
504         }
505         resetPartition(clusterName,
506                        instanceName,
507                        extView.getResourceName(),
508                        resetPartitionNames);
509       }
510     }
511   }
512 
513   @Override
514   public void resetResource(String clusterName, List<String> resourceNames)
515   {
516     // TODO: not mp-safe
517     ZKHelixDataAccessor accessor =
518         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
519     Builder keyBuilder = accessor.keyBuilder();
520     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
521 
522     Set<String> resetResourceNames = new HashSet<String>(resourceNames);
523     for (ExternalView extView : extViews)
524     {
525       if (!resetResourceNames.contains(extView.getResourceName()))
526       {
527         continue;
528       }
529 
530       // instanceName -> list of resetPartitionNames
531       Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>();
532 
533       Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
534       for (String partitionName : stateMap.keySet())
535       {
536         Map<String, String> instanceStateMap = stateMap.get(partitionName);
537         for (String instanceName : instanceStateMap.keySet())
538         {
539           if (instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()))
540           {
541             if (!resetPartitionNames.containsKey(instanceName))
542             {
543               resetPartitionNames.put(instanceName, new ArrayList<String>());
544             }
545             resetPartitionNames.get(instanceName).add(partitionName);
546           }
547         }
548       }
549 
550       for (String instanceName : resetPartitionNames.keySet())
551       {
552         resetPartition(clusterName,
553                        instanceName,
554                        extView.getResourceName(),
555                        resetPartitionNames.get(instanceName));
556       }
557     }
558   }
559 
560   @Override
561   public boolean addCluster(String clusterName){
562     return addCluster(clusterName, false);
563   }
564 
565   @Override
566   public boolean addCluster(String clusterName, boolean recreateIfExists)
567   {
568     String root = "/" + clusterName;
569 
570     if (_zkClient.exists(root))
571     {
572       if (recreateIfExists)
573       {
574         logger.warn("Root directory exists.Cleaning the root directory:" + root);
575         _zkClient.deleteRecursive(root);
576       }
577       else
578       {
579         logger.info("Cluster " + clusterName + " already exists");
580         return true;
581       }
582     }
583     try
584     {
585       _zkClient.createPersistent(root, true);
586     }
587     catch (Exception e) 
588     {
589       //some other process might have created the cluster
590       if(_zkClient.exists(root)){
591         return true;
592       }
593       logger.error("Error creating cluster:"+ clusterName,e);
594       return false;
595     }
596     try
597     {
598       createZKPaths(clusterName);
599     }
600     catch (Exception e) 
601     {
602       logger.error("Error creating cluster:"+ clusterName,e);
603       return false;
604     }
605     logger.info("Created cluster:"+ clusterName);
606     return true;
607   }
608   
609   private void createZKPaths(String clusterName){
610       String path;
611 
612     // IDEAL STATE
613     _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
614     // CONFIGURATIONS
615     // _zkClient.createPersistent(HelixUtil.getConfigPath(clusterName));
616     path =
617         PropertyPathConfig.getPath(PropertyType.CONFIGS,
618                                    clusterName,
619                                    ConfigScopeProperty.CLUSTER.toString(),
620                                    clusterName);
621     _zkClient.createPersistent(path, true);
622     _zkClient.writeData(path, new ZNRecord(clusterName));
623     path =
624         PropertyPathConfig.getPath(PropertyType.CONFIGS,
625                                    clusterName,
626                                    ConfigScopeProperty.PARTICIPANT.toString());
627     _zkClient.createPersistent(path);
628     path =
629         PropertyPathConfig.getPath(PropertyType.CONFIGS,
630                                    clusterName,
631                                    ConfigScopeProperty.RESOURCE.toString());
632     _zkClient.createPersistent(path);
633     // PROPERTY STORE
634     path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
635     _zkClient.createPersistent(path);
636     // LIVE INSTANCES
637     _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName));
638     // MEMBER INSTANCES
639     _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName));
640     // External view
641     _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName));
642     // State model definition
643     _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName));
644 
645     // controller
646     _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName));
647     path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
648     final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
649     final List<String> emptyList = new ArrayList<String>();
650     emptyHistory.setListField(clusterName, emptyList);
651     _zkClient.createPersistent(path, emptyHistory);
652 
653     path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
654     _zkClient.createPersistent(path);
655 
656     path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
657     _zkClient.createPersistent(path);
658 
659     path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
660     _zkClient.createPersistent(path);
661   }
662 
663   @Override
664   public List<String> getInstancesInCluster(String clusterName)
665   {
666     String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
667     return _zkClient.getChildren(memberInstancesPath);
668   }
669   
670   @Override
671   public List<String> getInstancesInClusterWithTag(String clusterName, String tag)
672   {
673     String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
674     List<String> instances =  _zkClient.getChildren(memberInstancesPath);
675     List<String> result = new ArrayList<String>();
676     
677     ZKHelixDataAccessor accessor =
678         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
679     Builder keyBuilder = accessor.keyBuilder();
680     
681     for(String instanceName : instances)
682     {
683       InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
684       if(config.containsTag(tag))
685       {
686         result.add(instanceName);
687       }
688     }
689     return result;
690   }
691 
692   @Override
693   public void addResource(String clusterName,
694                           String resourceName,
695                           int partitions,
696                           String stateModelRef)
697   {
698     addResource(clusterName,
699                 resourceName,
700                 partitions,
701                 stateModelRef,
702                 IdealStateModeProperty.AUTO.toString(),
703                 0);
704   }
705 
706   @Override
707   public void addResource(String clusterName,
708                           String resourceName,
709                           int partitions,
710                           String stateModelRef,
711                           String idealStateMode)
712   {
713     addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0);
714   }
715 
716   @Override
717   public void addResource(String clusterName, 
718                           String resourceName,
719                           IdealState idealstate)
720   {
721     String stateModelRef = idealstate.getStateModelDefRef();
722     String stateModelDefPath =
723       PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
724                                  clusterName,
725                                  stateModelRef);
726   if (!_zkClient.exists(stateModelDefPath))
727   {
728     throw new HelixException("State model " + stateModelRef
729         + " not found in the cluster STATEMODELDEFS path");
730   }
731 
732   String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
733   String resourceIdealStatePath = idealStatePath + "/" + resourceName;
734   if (_zkClient.exists(resourceIdealStatePath))
735   {
736     throw new HelixException("Skip the operation. Resource ideal state directory already exists:"
737         + resourceIdealStatePath);
738   }
739 
740   ZKUtil.createChildren(_zkClient, idealStatePath, idealstate.getRecord()); 
741   }
742   
743   @Override
744   public void addResource(String clusterName,
745                           String resourceName,
746                           int partitions,
747                           String stateModelRef,
748                           String idealStateMode,
749                           int bucketSize)
750   {
751     addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode,
752         bucketSize, -1);
753 
754   }
755   
756   @Override
757   public void addResource(String clusterName, String resourceName,
758       int partitions, String stateModelRef, String idealStateMode,
759       int bucketSize, int maxPartitionsPerInstance)
760   {
761     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
762     {
763       throw new HelixException("cluster " + clusterName + " is not setup yet");
764     }
765 
766     IdealStateModeProperty mode = IdealStateModeProperty.AUTO;
767     try
768     {
769       mode = IdealStateModeProperty.valueOf(idealStateMode);
770     }
771     catch (Exception e)
772     {
773       logger.error("", e);
774     }
775     IdealState idealState = new IdealState(resourceName);
776     idealState.setNumPartitions(partitions);
777     idealState.setStateModelDefRef(stateModelRef);
778     idealState.setIdealStateMode(mode.toString());
779     idealState.setReplicas("" + 0);
780     idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
781     if(maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE)
782     {
783       idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
784     }
785     if (bucketSize > 0)
786     {
787       idealState.setBucketSize(bucketSize);
788     }
789     addResource(clusterName, resourceName, idealState);
790   }
791 
792   @Override
793   public List<String> getClusters()
794   {
795     List<String> zkToplevelPathes = _zkClient.getChildren("/");
796     List<String> result = new ArrayList<String>();
797     for (String pathName : zkToplevelPathes)
798     {
799       if (ZKUtil.isClusterSetup(pathName, _zkClient))
800       {
801         result.add(pathName);
802       }
803     }
804     return result;
805   }
806 
807   @Override
808   public List<String> getResourcesInCluster(String clusterName)
809   {
810     return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName));
811   }
812 
813   @Override
814   public IdealState getResourceIdealState(String clusterName, String resourceName)
815   {
816     ZKHelixDataAccessor accessor =
817         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
818     Builder keyBuilder = accessor.keyBuilder();
819 
820     return accessor.getProperty(keyBuilder.idealStates(resourceName));
821   }
822 
823   @Override
824   public void setResourceIdealState(String clusterName,
825                                     String resourceName,
826                                     IdealState idealState)
827   {
828     ZKHelixDataAccessor accessor =
829         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
830     Builder keyBuilder = accessor.keyBuilder();
831 
832     accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
833   }
834 
835   @Override
836   public ExternalView getResourceExternalView(String clusterName, String resourceName)
837   {
838     ZKHelixDataAccessor accessor =
839         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
840     Builder keyBuilder = accessor.keyBuilder();
841     return accessor.getProperty(keyBuilder.externalView(resourceName));
842   }
843 
844   @Override
845   public void addStateModelDef(String clusterName,
846                                String stateModelDef,
847                                StateModelDefinition stateModel)
848   {
849     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
850     {
851       throw new HelixException("cluster " + clusterName + " is not setup yet");
852     }
853     String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
854     String stateModelPath = stateModelDefPath + "/" + stateModelDef;
855     if (_zkClient.exists(stateModelPath))
856     {
857       logger.warn("Skip the operation.State Model directory exists:" + stateModelPath);
858       throw new HelixException("State model path " + stateModelPath + " already exists.");
859     }
860 
861     ZKHelixDataAccessor accessor =
862         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
863     Builder keyBuilder = accessor.keyBuilder();
864     accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
865   }
866 
867   @Override
868   public void dropResource(String clusterName, String resourceName)
869   {
870     ZKHelixDataAccessor accessor =
871         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
872     Builder keyBuilder = accessor.keyBuilder();
873 
874     accessor.removeProperty(keyBuilder.idealStates(resourceName));
875     accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
876   }
877 
878   @Override
879   public List<String> getStateModelDefs(String clusterName)
880   {
881     return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName));
882   }
883 
884   @Override
885   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
886   {
887     ZKHelixDataAccessor accessor =
888         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
889     Builder keyBuilder = accessor.keyBuilder();
890 
891     return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
892   }
893 
894   @Override
895   public void addStat(String clusterName, final String statName)
896   {
897     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
898     {
899       throw new HelixException("cluster " + clusterName + " is not setup yet");
900     }
901 
902     String persistentStatsPath =
903         PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
904     ZkBaseDataAccessor<ZNRecord> baseAccessor =
905         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
906 
907     baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
908     {
909 
910       @Override
911       public ZNRecord update(ZNRecord statsRec)
912       {
913         if (statsRec == null)
914         {
915           // TODO: fix naming of this record, if it matters
916           statsRec = new ZNRecord(PersistentStats.nodeName);
917         }
918 
919         Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
920         Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
921         for (String newStat : newStatMap.keySet())
922         {
923           if (!currStatMap.containsKey(newStat))
924           {
925             currStatMap.put(newStat, newStatMap.get(newStat));
926           }
927         }
928         statsRec.setMapFields(currStatMap);
929 
930         return statsRec;
931       }
932     }, AccessOption.PERSISTENT);
933   }
934 
935   @Override
936   public void addAlert(final String clusterName, final String alertName)
937   {
938     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
939     {
940       throw new HelixException("cluster " + clusterName + " is not setup yet");
941     }
942 
943     ZkBaseDataAccessor<ZNRecord> baseAccessor =
944         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
945 
946     String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
947 
948     baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
949     {
950 
951       @Override
952       public ZNRecord update(ZNRecord alertsRec)
953       {
954         if (alertsRec == null)
955         {
956           // TODO: fix naming of this record, if it matters
957           alertsRec = new ZNRecord(Alerts.nodeName);
958 
959         }
960 
961         Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
962         StringBuilder newStatName = new StringBuilder();
963         Map<String, String> newAlertMap = new HashMap<String, String>();
964 
965         // use AlertsHolder to get map of new stats and map for this alert
966         AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
967 
968         // add stat
969         addStat(clusterName, newStatName.toString());
970 
971         // add alert
972         currAlertMap.put(alertName, newAlertMap);
973 
974         alertsRec.setMapFields(currAlertMap);
975 
976         return alertsRec;
977       }
978     }, AccessOption.PERSISTENT);
979   }
980 
981   @Override
982   public void dropCluster(String clusterName)
983   {
984     logger.info("Deleting cluster " + clusterName);
985     ZKHelixDataAccessor accessor =
986         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
987     Builder keyBuilder = accessor.keyBuilder();
988 
989     String root = "/" + clusterName;
990     if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0)
991     {
992       throw new HelixException("There are still live instances in the cluster, shut them down first.");
993     }
994 
995     if (accessor.getProperty(keyBuilder.controllerLeader()) != null)
996     {
997       throw new HelixException("There are still LEADER in the cluster, shut them down first.");
998     }
999 
1000     _zkClient.deleteRecursive(root);
1001   }
1002 
1003   @Override
1004   public void dropStat(String clusterName, final String statName)
1005   {
1006     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1007     {
1008       throw new HelixException("cluster " + clusterName + " is not setup yet");
1009     }
1010 
1011     String persistentStatsPath =
1012         PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
1013     ZkBaseDataAccessor<ZNRecord> baseAccessor =
1014         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1015 
1016     baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
1017     {
1018 
1019       @Override
1020       public ZNRecord update(ZNRecord statsRec)
1021       {
1022         if (statsRec == null)
1023         {
1024           throw new HelixException("No stats record in ZK, nothing to drop");
1025         }
1026 
1027         Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
1028         Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
1029 
1030         // delete each stat from stat map
1031         for (String newStat : newStatMap.keySet())
1032         {
1033           if (currStatMap.containsKey(newStat))
1034           {
1035             currStatMap.remove(newStat);
1036           }
1037         }
1038         statsRec.setMapFields(currStatMap);
1039 
1040         return statsRec;
1041       }
1042     }, AccessOption.PERSISTENT);
1043   }
1044 
1045   @Override
1046   public void dropAlert(String clusterName, final String alertName)
1047   {
1048     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1049     {
1050       throw new HelixException("cluster " + clusterName + " is not setup yet");
1051     }
1052 
1053     String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
1054 
1055     ZkBaseDataAccessor<ZNRecord> baseAccessor =
1056         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1057 
1058     if (!baseAccessor.exists(alertsPath, 0))
1059     {
1060       throw new HelixException("No alerts node in ZK, nothing to drop");
1061     }
1062 
1063     baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
1064     {
1065       @Override
1066       public ZNRecord update(ZNRecord alertsRec)
1067       {
1068         if (alertsRec == null)
1069         {
1070           throw new HelixException("No alerts record in ZK, nothing to drop");
1071         }
1072 
1073         Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
1074         currAlertMap.remove(alertName);
1075         alertsRec.setMapFields(currAlertMap);
1076 
1077         return alertsRec;
1078       }
1079     }, AccessOption.PERSISTENT);
1080   }
1081 
1082   @Override
1083   public void addClusterToGrandCluster(String clusterName, String grandCluster)
1084   {
1085     if (!ZKUtil.isClusterSetup(grandCluster, _zkClient))
1086     {
1087       throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");
1088     }
1089 
1090     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1091     {
1092       throw new HelixException("Cluster " + clusterName + " is not setup yet");
1093     }
1094 
1095     IdealState idealState = new IdealState(clusterName);
1096 
1097     idealState.setNumPartitions(1);
1098     idealState.setStateModelDefRef("LeaderStandby");
1099 
1100     List<String> controllers = getInstancesInCluster(grandCluster);
1101     if (controllers.size() == 0)
1102     {
1103       throw new HelixException("Grand cluster " + grandCluster + " has no instances");
1104     }
1105     idealState.setReplicas(Integer.toString(controllers.size()));
1106     Collections.shuffle(controllers);
1107     idealState.getRecord().setListField(clusterName, controllers);
1108     idealState.setPartitionState(clusterName, controllers.get(0), "LEADER");
1109     for (int i = 1; i < controllers.size(); i++)
1110     {
1111       idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
1112     }
1113 
1114     ZKHelixDataAccessor accessor =
1115         new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1116     Builder keyBuilder = accessor.keyBuilder();
1117 
1118     accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
1119   }
1120 
1121   @Override
1122   public void setConfig(HelixConfigScope scope, Map<String, String> properties)
1123   {
1124     _configAccessor.set(scope, properties);
1125   }
1126 
1127   @Override
1128   public Map<String, String> getConfig(HelixConfigScope scope, List<String> keys)
1129   {
1130     return _configAccessor.get(scope, keys);
1131   }
1132 
1133   @Override
1134   public List<String> getConfigKeys(HelixConfigScope scope)
1135   {
1136     return _configAccessor.getKeys(scope);
1137   }
1138 
1139   @Override
1140   public void removeConfig(HelixConfigScope scope, List<String> keys)
1141   {
1142     _configAccessor.remove(scope, keys);
1143   }
1144 
1145   @Override
1146   public void rebalance(String clusterName, String resourceName, int replica)
1147   {
1148     rebalance(clusterName, resourceName, replica, resourceName, "");
1149   }
1150   
1151   @Override
1152   public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, String group)
1153   {
1154     List<String> instanceNames = new LinkedList<String>();
1155     if(keyPrefix == null || keyPrefix.length() == 0)
1156     {
1157       keyPrefix = resourceName;
1158     }
1159     if(group != null && group.length() > 0)
1160     {
1161       instanceNames = getInstancesInClusterWithTag(clusterName, group);
1162     }
1163     if(instanceNames.size() == 0)
1164     {
1165       logger.info("No tags found for resource " + resourceName + ", use all instances");
1166       instanceNames = getInstancesInCluster(clusterName);
1167       group = "";
1168     }
1169     else
1170     {
1171       logger.info("Found instances with tag for " + resourceName + " " + instanceNames);
1172     }
1173     rebalance(clusterName, resourceName, replica, keyPrefix, instanceNames, group);
1174   }
1175   
1176   @Override
1177   public void rebalance(String clusterName, String resourceName, int replica, List<String> instances)
1178   {
1179     rebalance(clusterName, resourceName, replica, resourceName, instances, "");
1180   }
1181   
1182   
1183   void rebalance(String clusterName, 
1184                  String resourceName, 
1185                  int replica, 
1186                  String keyPrefix, 
1187                  List<String> instanceNames,
1188                  String groupId)
1189   {
1190     // ensure we get the same idealState with the same set of instances
1191     Collections.sort(instanceNames);
1192 
1193     IdealState idealState = getResourceIdealState(clusterName, resourceName);
1194     if (idealState == null)
1195     {
1196       throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
1197     }
1198 
1199     if(groupId != null && groupId.length() > 0)
1200     {
1201       idealState.setInstanceGroupTag(groupId);
1202     }
1203     idealState.setReplicas(Integer.toString(replica));
1204     int partitions = idealState.getNumPartitions();
1205     String stateModelName = idealState.getStateModelDefRef();
1206     StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
1207 
1208     if (stateModDef == null)
1209     {
1210       throw new HelixException("cannot find state model: " + stateModelName);
1211     }
1212     // StateModelDefinition def = new StateModelDefinition(stateModDef);
1213 
1214     List<String> statePriorityList = stateModDef.getStatesPriorityList();
1215 
1216     String masterStateValue = null;
1217     String slaveStateValue = null;
1218     replica--;
1219 
1220     for (String state : statePriorityList)
1221     {
1222       String count = stateModDef.getNumInstancesPerState(state);
1223       if (count.equals("1"))
1224       {
1225         if (masterStateValue != null)
1226         {
1227           throw new HelixException("Invalid or unsupported state model definition");
1228         }
1229         masterStateValue = state;
1230       }
1231       else if (count.equalsIgnoreCase("R"))
1232       {
1233         if (slaveStateValue != null)
1234         {
1235           throw new HelixException("Invalid or unsupported state model definition");
1236         }
1237         slaveStateValue = state;
1238       }
1239       else if (count.equalsIgnoreCase("N"))
1240       {
1241         if (!(masterStateValue == null && slaveStateValue == null))
1242         {
1243           throw new HelixException("Invalid or unsupported state model definition");
1244         }
1245         replica = instanceNames.size() - 1;
1246         masterStateValue = slaveStateValue = state;
1247       }
1248     }
1249     if (masterStateValue == null && slaveStateValue == null)
1250     {
1251       throw new HelixException("Invalid or unsupported state model definition");
1252     }
1253 
1254     if (masterStateValue == null)
1255     {
1256       masterStateValue = slaveStateValue;
1257     }
1258     if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
1259     {
1260       ZNRecord newIdealState =
1261           DefaultIdealStateCalculator.calculateIdealState(instanceNames,
1262                                                                  partitions,
1263                                                                  replica,
1264                                                                  keyPrefix,
1265                                                                  masterStateValue,
1266                                                                  slaveStateValue);
1267 
1268       // for now keep mapField in AUTO mode and remove listField in CUSTOMIZED mode
1269       if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
1270       {
1271         idealState.getRecord().setListFields(newIdealState.getListFields());
1272         idealState.getRecord().setMapFields(newIdealState.getMapFields());
1273       }
1274       if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
1275       {
1276         idealState.getRecord().setMapFields(newIdealState.getMapFields());
1277       }
1278     }
1279     else
1280     {
1281       for (int i = 0; i < partitions; i++)
1282       {
1283         String partitionName = keyPrefix + "_" + i;
1284         idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
1285         idealState.getRecord().setListField(partitionName, new ArrayList<String>());
1286       }
1287     }
1288     setResourceIdealState(clusterName, resourceName, idealState);
1289   }
1290 
1291   @Override
1292   public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
1293   {
1294     ZNRecord idealStateRecord =
1295         (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile)));
1296     if (idealStateRecord.getId() == null
1297         || !idealStateRecord.getId().equals(resourceName))
1298     {
1299       throw new IllegalArgumentException("ideal state must have same id as resource name");
1300     }
1301     setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
1302   }
1303 
1304   private static byte[] readFile(String filePath) throws IOException
1305   {
1306     File file = new File(filePath);
1307 
1308     int size = (int) file.length();
1309     byte[] bytes = new byte[size];
1310     DataInputStream dis = new DataInputStream(new FileInputStream(file));
1311     int read = 0;
1312     int numRead = 0;
1313     while (read < bytes.length
1314         && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0)
1315     {
1316       read = read + numRead;
1317     }
1318     return bytes;
1319   }
1320 
1321   public void addStateModelDef(String clusterName,
1322                                String stateModelDefName,
1323                                String stateModelDefFile) throws IOException
1324   {
1325     ZNRecord record =
1326         (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
1327     if (record == null || record.getId() == null
1328         || !record.getId().equals(stateModelDefName))
1329     {
1330       throw new IllegalArgumentException("state model definition must have same id as state model def name");
1331     }
1332     addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record));
1333 
1334   }
1335 
1336   @Override
1337   public void setConstraint(String clusterName,
1338                             final ConstraintType constraintType,
1339                             final String constraintId,
1340                             final ConstraintItem constraintItem)
1341   {
1342     ZkBaseDataAccessor<ZNRecord> baseAccessor =
1343         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1344 
1345     Builder keyBuilder = new Builder(clusterName);
1346     String path = keyBuilder.constraint(constraintType.toString()).getPath();
1347 
1348     baseAccessor.update(path, new DataUpdater<ZNRecord>()
1349     {
1350       @Override
1351       public ZNRecord update(ZNRecord currentData)
1352       {
1353         ClusterConstraints constraints = currentData == null? 
1354             new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
1355 
1356         constraints.addConstraintItem(constraintId, constraintItem);
1357         return constraints.getRecord();
1358       }
1359     }, AccessOption.PERSISTENT);
1360   }
1361   
1362   @Override
1363   public void removeConstraint(String clusterName, 
1364                                final ConstraintType constraintType,
1365                                final String constraintId)
1366   {
1367     ZkBaseDataAccessor<ZNRecord> baseAccessor =
1368         new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1369 
1370     Builder keyBuilder = new Builder(clusterName);
1371     String path = keyBuilder.constraint(constraintType.toString()).getPath();
1372 
1373     baseAccessor.update(path, new DataUpdater<ZNRecord>()
1374     {
1375       @Override
1376       public ZNRecord update(ZNRecord currentData)
1377       {
1378         if (currentData != null) {
1379           ClusterConstraints constraints = new ClusterConstraints(currentData);
1380 
1381           constraints.removeConstraintItem(constraintId);
1382           return constraints.getRecord();
1383         }
1384         return null;
1385       }
1386     }, AccessOption.PERSISTENT);
1387   }
1388   
1389   @Override
1390   public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType)
1391   {
1392     HelixDataAccessor accessor =
1393         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1394 
1395     Builder keyBuilder = new Builder(clusterName);
1396     return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
1397   }
1398   
1399   /**
1400    * Takes the existing idealstate as input and computes newIdealState such that 
1401    * the partition movement is minimized. The partitions are redistributed among the instances provided.
1402    * @param clusterName 
1403    * @param currentIdealState
1404    * @param instanceNames
1405    * @return
1406    */
1407   @Override
1408   public void rebalance(String clusterName,
1409                               IdealState currentIdealState, 
1410                               List<String> instanceNames)
1411   {
1412     Set<String> activeInstances = new HashSet<String>();
1413     for (String partition : currentIdealState.getPartitionSet())
1414     {
1415       activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
1416     }
1417     instanceNames.removeAll(activeInstances);
1418     Map<String, Object> previousIdealState = RebalanceUtil.buildInternalIdealState(currentIdealState);
1419 
1420     Map<String, Object> balancedRecord =
1421         DefaultIdealStateCalculator.calculateNextIdealState(instanceNames,
1422                                                                    previousIdealState);
1423     StateModelDefinition stateModDef =
1424         this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef());
1425 
1426     if (stateModDef == null)
1427     {
1428       throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefRef());
1429     }
1430     String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
1431 
1432     ZNRecord newIdealStateRecord =
1433         DefaultIdealStateCalculator.convertToZNRecord(balancedRecord,
1434                                                              currentIdealState.getResourceName(),
1435                                                              states[0],
1436                                                              states[1]);
1437     Set<String> partitionSet = new HashSet<String>();
1438     partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
1439     partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
1440 
1441     Map<String, String> reversePartitionIndex =
1442         (Map<String, String>) balancedRecord.get("reversePartitionIndex");
1443     for (String partition : partitionSet)
1444     {
1445       if (reversePartitionIndex.containsKey(partition))
1446       {
1447         String originPartitionName = reversePartitionIndex.get(partition);
1448         if (partition.equals(originPartitionName))
1449         {
1450           continue;
1451         }
1452         newIdealStateRecord.getMapFields()
1453                            .put(originPartitionName,
1454                                 newIdealStateRecord.getMapField(partition));
1455         newIdealStateRecord.getMapFields().remove(partition);
1456 
1457         newIdealStateRecord.getListFields()
1458                            .put(originPartitionName,
1459                                 newIdealStateRecord.getListField(partition));
1460         newIdealStateRecord.getListFields().remove(partition);
1461       }
1462     }
1463 
1464     newIdealStateRecord.getSimpleFields()
1465                        .putAll(currentIdealState.getRecord().getSimpleFields());
1466     IdealState newIdealState = new IdealState(newIdealStateRecord);
1467     setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
1468   }
1469 
1470   @Override
1471   public void addInstanceTag(String clusterName, String instanceName, String tag)
1472   {
1473     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1474     {
1475       throw new HelixException("cluster " + clusterName + " is not setup yet");
1476     }
1477     
1478     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
1479     {
1480       throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet");
1481     }
1482     ZKHelixDataAccessor accessor =
1483         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1484     Builder keyBuilder = accessor.keyBuilder();
1485 
1486     InstanceConfig config =  accessor.getProperty(keyBuilder.instanceConfig(instanceName));
1487     config.addTag(tag);
1488     accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
1489   }
1490 
1491   @Override
1492   public void removeInstanceTag(String clusterName, String instanceName,
1493       String tag)
1494   {
1495     if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1496     {
1497       throw new HelixException("cluster " + clusterName + " is not setup yet");
1498     }
1499     
1500     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
1501     {
1502       throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet");
1503     }
1504     ZKHelixDataAccessor accessor =
1505         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1506     Builder keyBuilder = accessor.keyBuilder();
1507     
1508     InstanceConfig config =  accessor.getProperty(keyBuilder.instanceConfig(instanceName));
1509     config.removeTag(tag);
1510     accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
1511   }
1512 
1513   public void close()
1514   {
1515     if(_zkClient!=null)
1516     {
1517     _zkClient.close();
1518     }
1519   }
1520  
1521 }