1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.DataInputStream;
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.IOException;
26 import java.net.InetAddress;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.TreeMap;
37 import java.util.UUID;
38 import java.util.concurrent.TimeUnit;
39
40 import org.I0Itec.zkclient.DataUpdater;
41 import org.I0Itec.zkclient.exception.ZkNoNodeException;
42 import org.apache.helix.AccessOption;
43 import org.apache.helix.ConfigAccessor;
44 import org.apache.helix.HelixAdmin;
45 import org.apache.helix.HelixConstants;
46 import org.apache.helix.HelixDataAccessor;
47 import org.apache.helix.HelixDefinedState;
48 import org.apache.helix.HelixException;
49 import org.apache.helix.InstanceType;
50 import org.apache.helix.PropertyKey;
51 import org.apache.helix.PropertyKey.Builder;
52 import org.apache.helix.PropertyPathConfig;
53 import org.apache.helix.PropertyType;
54 import org.apache.helix.ZNRecord;
55 import org.apache.helix.alerts.AlertsHolder;
56 import org.apache.helix.alerts.StatsHolder;
57 import org.apache.helix.model.Alerts;
58 import org.apache.helix.model.ClusterConstraints;
59 import org.apache.helix.model.ClusterConstraints.ConstraintType;
60 import org.apache.helix.model.ConfigScope;
61 import org.apache.helix.model.ConstraintItem;
62 import org.apache.helix.model.CurrentState;
63 import org.apache.helix.model.ExternalView;
64 import org.apache.helix.model.HelixConfigScope;
65 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
66 import org.apache.helix.model.IdealState;
67 import org.apache.helix.model.IdealState.IdealStateModeProperty;
68 import org.apache.helix.model.InstanceConfig;
69 import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
70 import org.apache.helix.model.LiveInstance;
71 import org.apache.helix.model.Message;
72 import org.apache.helix.model.Message.MessageState;
73 import org.apache.helix.model.Message.MessageType;
74 import org.apache.helix.model.PauseSignal;
75 import org.apache.helix.model.PersistentStats;
76 import org.apache.helix.model.StateModelDefinition;
77 import org.apache.helix.tools.DefaultIdealStateCalculator;
78 import org.apache.helix.util.HelixUtil;
79 import org.apache.helix.util.RebalanceUtil;
80 import org.apache.log4j.Logger;
81
82
83 public class ZKHelixAdmin implements HelixAdmin
84 {
85
86 private final ZkClient _zkClient;
87 private final ConfigAccessor _configAccessor;
88
89 private static Logger logger = Logger.getLogger(ZKHelixAdmin.class);
90
91 public ZKHelixAdmin(ZkClient zkClient)
92 {
93 _zkClient = zkClient;
94 _configAccessor = new ConfigAccessor(zkClient);
95 }
96
97 public ZKHelixAdmin(String zkAddress)
98 {
99 _zkClient = new ZkClient(zkAddress);
100 _zkClient.setZkSerializer(new ZNRecordSerializer());
101 _zkClient.waitUntilConnected(30, TimeUnit.SECONDS);
102 _configAccessor = new ConfigAccessor(_zkClient);
103 }
104
105 @Override
106 public void addInstance(String clusterName, InstanceConfig instanceConfig)
107 {
108 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
109 {
110 throw new HelixException("cluster " + clusterName + " is not setup yet");
111 }
112 String instanceConfigsPath =
113 PropertyPathConfig.getPath(PropertyType.CONFIGS,
114 clusterName,
115 ConfigScopeProperty.PARTICIPANT.toString());
116 String nodeId = instanceConfig.getId();
117 String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
118
119 if (_zkClient.exists(instanceConfigPath))
120 {
121 throw new HelixException("Node " + nodeId + " already exists in cluster "
122 + clusterName);
123 }
124
125 ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
126
127 _zkClient.createPersistent(HelixUtil.getMessagePath(clusterName, nodeId), true);
128 _zkClient.createPersistent(HelixUtil.getCurrentStateBasePath(clusterName, nodeId),
129 true);
130 _zkClient.createPersistent(HelixUtil.getErrorsPath(clusterName, nodeId), true);
131 _zkClient.createPersistent(HelixUtil.getStatusUpdatesPath(clusterName, nodeId), true);
132 }
133
134 @Override
135 public void dropInstance(String clusterName, InstanceConfig instanceConfig)
136 {
137
138 String instanceConfigsPath =
139 PropertyPathConfig.getPath(PropertyType.CONFIGS,
140 clusterName,
141 ConfigScopeProperty.PARTICIPANT.toString());
142 String nodeId = instanceConfig.getId();
143 String instanceConfigPath = instanceConfigsPath + "/" + nodeId;
144 String instancePath = HelixUtil.getInstancePath(clusterName, nodeId);
145
146 if (!_zkClient.exists(instanceConfigPath))
147 {
148 throw new HelixException("Node " + nodeId
149 + " does not exist in config for cluster " + clusterName);
150 }
151
152 if (!_zkClient.exists(instancePath))
153 {
154 throw new HelixException("Node " + nodeId
155 + " does not exist in instances for cluster " + clusterName);
156 }
157
158
159 ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
160
161
162 _zkClient.deleteRecursive(instancePath);
163 }
164
165 @Override
166 public InstanceConfig getInstanceConfig(String clusterName, String instanceName)
167 {
168
169
170
171 String instanceConfigPath =
172 PropertyPathConfig.getPath(PropertyType.CONFIGS,
173 clusterName,
174 ConfigScopeProperty.PARTICIPANT.toString(),
175 instanceName);
176 if (!_zkClient.exists(instanceConfigPath))
177 {
178 throw new HelixException("instance" + instanceName + " does not exist in cluster "
179 + clusterName);
180 }
181
182 ZKHelixDataAccessor accessor =
183 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
184 Builder keyBuilder = accessor.keyBuilder();
185
186 return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
187 }
188
189 @Override
190 public void enableInstance(final String clusterName,
191 final String instanceName,
192 final boolean enabled)
193 {
194 String path =
195 PropertyPathConfig.getPath(PropertyType.CONFIGS,
196 clusterName,
197 ConfigScopeProperty.PARTICIPANT.toString(),
198 instanceName);
199
200 ZkBaseDataAccessor<ZNRecord> baseAccessor =
201 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
202 if (!baseAccessor.exists(path, 0))
203 {
204 throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
205 + ", instance config does not exist");
206 }
207
208 baseAccessor.update(path, new DataUpdater<ZNRecord>()
209 {
210 @Override
211 public ZNRecord update(ZNRecord currentData)
212 {
213 if (currentData == null)
214 {
215 throw new HelixException("Cluster: " + clusterName + ", instance: "
216 + instanceName + ", participant config is null");
217 }
218
219 InstanceConfig config = new InstanceConfig(currentData);
220 config.setInstanceEnabled(enabled);
221 return config.getRecord();
222 }
223 }, AccessOption.PERSISTENT);
224 }
225
226 @Override
227 public void enablePartition(final boolean enabled,
228 final String clusterName,
229 final String instanceName,
230 final String resourceName,
231 final List<String> partitionNames)
232 {
233 String path =
234 PropertyPathConfig.getPath(PropertyType.CONFIGS,
235 clusterName,
236 ConfigScopeProperty.PARTICIPANT.toString(),
237 instanceName);
238
239 ZkBaseDataAccessor<ZNRecord> baseAccessor =
240 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
241
242
243 if (!baseAccessor.exists(path, 0))
244 {
245 throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
246 + ", instance config does not exist");
247 }
248
249
250 String idealStatePath =
251 PropertyPathConfig.getPath(PropertyType.IDEALSTATES, clusterName, resourceName);
252
253 ZNRecord idealStateRecord = null;
254 try
255 {
256 idealStateRecord = baseAccessor.get(idealStatePath, null, 0);
257 }
258 catch (ZkNoNodeException e)
259 {
260
261 }
262
263
264 if (idealStateRecord == null)
265 {
266
267
268 logger.warn("Disable partitions: " + partitionNames + " but Cluster: " + clusterName
269 + ", resource: " + resourceName + " does not exists. probably disable it during ERROR->DROPPED transtition");
270
271 } else {
272
273 IdealState idealState = new IdealState(idealStateRecord);
274 for (String partitionName : partitionNames)
275 {
276 if ((idealState.getIdealStateMode() == IdealStateModeProperty.AUTO && idealState.getPreferenceList(partitionName) == null)
277 || (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED && idealState.getInstanceStateMap(partitionName) == null))
278 {
279 logger.warn("Cluster: " + clusterName + ", resource: " + resourceName
280 + ", partition: " + partitionName
281 + ", partition does not exist in ideal state");
282 }
283 }
284 }
285
286
287
288 baseAccessor.update(path, new DataUpdater<ZNRecord>()
289 {
290 @Override
291 public ZNRecord update(ZNRecord currentData)
292 {
293 if (currentData == null)
294 {
295 throw new HelixException("Cluster: " + clusterName + ", instance: "
296 + instanceName + ", participant config is null");
297 }
298
299
300 List<String> list =
301 currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
302 Set<String> disabledPartitions = new HashSet<String>();
303 if (list != null)
304 {
305 disabledPartitions.addAll(list);
306 }
307
308 if (enabled)
309 {
310 disabledPartitions.removeAll(partitionNames);
311 }
312 else
313 {
314 disabledPartitions.addAll(partitionNames);
315 }
316
317 list = new ArrayList<String>(disabledPartitions);
318 Collections.sort(list);
319 currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(),
320 list);
321 return currentData;
322 }
323 },
324 AccessOption.PERSISTENT);
325 }
326
327 @Override
328 public void enableCluster(String clusterName, boolean enabled)
329 {
330 HelixDataAccessor accessor =
331 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
332 Builder keyBuilder = accessor.keyBuilder();
333
334 if (enabled)
335 {
336 accessor.removeProperty(keyBuilder.pause());
337 }
338 else
339 {
340 accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause"));
341 }
342 }
343
344 @Override
345 public void resetPartition(String clusterName,
346 String instanceName,
347 String resourceName,
348 List<String> partitionNames)
349 {
350 ZKHelixDataAccessor accessor =
351 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
352 Builder keyBuilder = accessor.keyBuilder();
353
354
355 LiveInstance liveInstance =
356 accessor.getProperty(keyBuilder.liveInstance(instanceName));
357 if (liveInstance == null)
358 {
359 throw new HelixException("Can't reset state for " + resourceName + "/"
360 + partitionNames + " on " + instanceName + ", because " + instanceName
361 + " is not alive");
362 }
363
364
365 IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
366 if (idealState == null)
367 {
368 throw new HelixException("Can't reset state for " + resourceName + "/"
369 + partitionNames + " on " + instanceName + ", because " + resourceName
370 + " is not added");
371 }
372
373
374 Set<String> resetPartitionNames = new HashSet<String>(partitionNames);
375 if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
376 {
377 Set<String> partitions =
378 new HashSet<String>(idealState.getRecord().getMapFields().keySet());
379 if (!partitions.containsAll(resetPartitionNames))
380 {
381 throw new HelixException("Can't reset state for " + resourceName + "/"
382 + partitionNames + " on " + instanceName + ", because not all "
383 + partitionNames + " exist");
384 }
385 }
386 else
387 {
388 Set<String> partitions =
389 new HashSet<String>(idealState.getRecord().getListFields().keySet());
390 if (!partitions.containsAll(resetPartitionNames))
391 {
392 throw new HelixException("Can't reset state for " + resourceName + "/"
393 + partitionNames + " on " + instanceName + ", because not all "
394 + partitionNames + " exist");
395 }
396 }
397
398
399 String sessionId = liveInstance.getSessionId();
400 CurrentState curState =
401 accessor.getProperty(keyBuilder.currentState(instanceName,
402 sessionId,
403 resourceName));
404 for (String partitionName : resetPartitionNames)
405 {
406 if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString()))
407 {
408 throw new HelixException("Can't reset state for " + resourceName + "/"
409 + partitionNames + " on " + instanceName + ", because not all "
410 + partitionNames + " are in ERROR state");
411 }
412 }
413
414
415 String stateModelDef = idealState.getStateModelDefRef();
416 StateModelDefinition stateModel =
417 accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
418 if (stateModel == null)
419 {
420 throw new HelixException("Can't reset state for " + resourceName + "/"
421 + partitionNames + " on " + instanceName + ", because " + stateModelDef
422 + " is NOT found");
423 }
424
425
426 List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName));
427 for (Message message : messages)
428 {
429 if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())
430 || !sessionId.equals(message.getTgtSessionId())
431 || !resourceName.equals(message.getResourceName())
432 || !resetPartitionNames.contains(message.getPartitionName()))
433 {
434 continue;
435 }
436
437 throw new HelixException("Can't reset state for " + resourceName + "/"
438 + partitionNames + " on " + instanceName
439 + ", because a pending message exists: " + message);
440 }
441
442 String adminName = null;
443 try
444 {
445 adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
446 }
447 catch (UnknownHostException e)
448 {
449
450 logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
451 adminName = "UNKNOWN";
452 }
453
454 List<Message> resetMessages = new ArrayList<Message>();
455 List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
456 for (String partitionName : resetPartitionNames)
457 {
458
459 String msgId = UUID.randomUUID().toString();
460 Message message = new Message(MessageType.STATE_TRANSITION, msgId);
461 message.setSrcName(adminName);
462 message.setTgtName(instanceName);
463 message.setMsgState(MessageState.NEW);
464 message.setPartitionName(partitionName);
465 message.setResourceName(resourceName);
466 message.setTgtSessionId(sessionId);
467 message.setStateModelDef(stateModelDef);
468 message.setFromState(HelixDefinedState.ERROR.toString());
469 message.setToState(stateModel.getInitialState());
470 message.setStateModelFactoryName(idealState.getStateModelFactoryName());
471
472 resetMessages.add(message);
473 messageKeys.add(keyBuilder.message(instanceName, message.getId()));
474 }
475
476 accessor.setChildren(messageKeys, resetMessages);
477 }
478
479 @Override
480 public void resetInstance(String clusterName, List<String> instanceNames)
481 {
482
483 ZKHelixDataAccessor accessor =
484 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
485 Builder keyBuilder = accessor.keyBuilder();
486 List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
487
488 Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
489 for (String instanceName : resetInstanceNames)
490 {
491 List<String> resetPartitionNames = new ArrayList<String>();
492 for (ExternalView extView : extViews)
493 {
494 Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
495 for (String partitionName : stateMap.keySet())
496 {
497 Map<String, String> instanceStateMap = stateMap.get(partitionName);
498
499 if (instanceStateMap.containsKey(instanceName)
500 && instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()))
501 {
502 resetPartitionNames.add(partitionName);
503 }
504 }
505 resetPartition(clusterName,
506 instanceName,
507 extView.getResourceName(),
508 resetPartitionNames);
509 }
510 }
511 }
512
513 @Override
514 public void resetResource(String clusterName, List<String> resourceNames)
515 {
516
517 ZKHelixDataAccessor accessor =
518 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
519 Builder keyBuilder = accessor.keyBuilder();
520 List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
521
522 Set<String> resetResourceNames = new HashSet<String>(resourceNames);
523 for (ExternalView extView : extViews)
524 {
525 if (!resetResourceNames.contains(extView.getResourceName()))
526 {
527 continue;
528 }
529
530
531 Map<String, List<String>> resetPartitionNames = new HashMap<String, List<String>>();
532
533 Map<String, Map<String, String>> stateMap = extView.getRecord().getMapFields();
534 for (String partitionName : stateMap.keySet())
535 {
536 Map<String, String> instanceStateMap = stateMap.get(partitionName);
537 for (String instanceName : instanceStateMap.keySet())
538 {
539 if (instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()))
540 {
541 if (!resetPartitionNames.containsKey(instanceName))
542 {
543 resetPartitionNames.put(instanceName, new ArrayList<String>());
544 }
545 resetPartitionNames.get(instanceName).add(partitionName);
546 }
547 }
548 }
549
550 for (String instanceName : resetPartitionNames.keySet())
551 {
552 resetPartition(clusterName,
553 instanceName,
554 extView.getResourceName(),
555 resetPartitionNames.get(instanceName));
556 }
557 }
558 }
559
560 @Override
561 public boolean addCluster(String clusterName){
562 return addCluster(clusterName, false);
563 }
564
565 @Override
566 public boolean addCluster(String clusterName, boolean recreateIfExists)
567 {
568 String root = "/" + clusterName;
569
570 if (_zkClient.exists(root))
571 {
572 if (recreateIfExists)
573 {
574 logger.warn("Root directory exists.Cleaning the root directory:" + root);
575 _zkClient.deleteRecursive(root);
576 }
577 else
578 {
579 logger.info("Cluster " + clusterName + " already exists");
580 return true;
581 }
582 }
583 try
584 {
585 _zkClient.createPersistent(root, true);
586 }
587 catch (Exception e)
588 {
589
590 if(_zkClient.exists(root)){
591 return true;
592 }
593 logger.error("Error creating cluster:"+ clusterName,e);
594 return false;
595 }
596 try
597 {
598 createZKPaths(clusterName);
599 }
600 catch (Exception e)
601 {
602 logger.error("Error creating cluster:"+ clusterName,e);
603 return false;
604 }
605 logger.info("Created cluster:"+ clusterName);
606 return true;
607 }
608
609 private void createZKPaths(String clusterName){
610 String path;
611
612
613 _zkClient.createPersistent(HelixUtil.getIdealStatePath(clusterName));
614
615
616 path =
617 PropertyPathConfig.getPath(PropertyType.CONFIGS,
618 clusterName,
619 ConfigScopeProperty.CLUSTER.toString(),
620 clusterName);
621 _zkClient.createPersistent(path, true);
622 _zkClient.writeData(path, new ZNRecord(clusterName));
623 path =
624 PropertyPathConfig.getPath(PropertyType.CONFIGS,
625 clusterName,
626 ConfigScopeProperty.PARTICIPANT.toString());
627 _zkClient.createPersistent(path);
628 path =
629 PropertyPathConfig.getPath(PropertyType.CONFIGS,
630 clusterName,
631 ConfigScopeProperty.RESOURCE.toString());
632 _zkClient.createPersistent(path);
633
634 path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
635 _zkClient.createPersistent(path);
636
637 _zkClient.createPersistent(HelixUtil.getLiveInstancesPath(clusterName));
638
639 _zkClient.createPersistent(HelixUtil.getMemberInstancesPath(clusterName));
640
641 _zkClient.createPersistent(HelixUtil.getExternalViewPath(clusterName));
642
643 _zkClient.createPersistent(HelixUtil.getStateModelDefinitionPath(clusterName));
644
645
646 _zkClient.createPersistent(HelixUtil.getControllerPath(clusterName));
647 path = PropertyPathConfig.getPath(PropertyType.HISTORY, clusterName);
648 final ZNRecord emptyHistory = new ZNRecord(PropertyType.HISTORY.toString());
649 final List<String> emptyList = new ArrayList<String>();
650 emptyHistory.setListField(clusterName, emptyList);
651 _zkClient.createPersistent(path, emptyHistory);
652
653 path = PropertyPathConfig.getPath(PropertyType.MESSAGES_CONTROLLER, clusterName);
654 _zkClient.createPersistent(path);
655
656 path = PropertyPathConfig.getPath(PropertyType.STATUSUPDATES_CONTROLLER, clusterName);
657 _zkClient.createPersistent(path);
658
659 path = PropertyPathConfig.getPath(PropertyType.ERRORS_CONTROLLER, clusterName);
660 _zkClient.createPersistent(path);
661 }
662
663 @Override
664 public List<String> getInstancesInCluster(String clusterName)
665 {
666 String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
667 return _zkClient.getChildren(memberInstancesPath);
668 }
669
670 @Override
671 public List<String> getInstancesInClusterWithTag(String clusterName, String tag)
672 {
673 String memberInstancesPath = HelixUtil.getMemberInstancesPath(clusterName);
674 List<String> instances = _zkClient.getChildren(memberInstancesPath);
675 List<String> result = new ArrayList<String>();
676
677 ZKHelixDataAccessor accessor =
678 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
679 Builder keyBuilder = accessor.keyBuilder();
680
681 for(String instanceName : instances)
682 {
683 InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
684 if(config.containsTag(tag))
685 {
686 result.add(instanceName);
687 }
688 }
689 return result;
690 }
691
692 @Override
693 public void addResource(String clusterName,
694 String resourceName,
695 int partitions,
696 String stateModelRef)
697 {
698 addResource(clusterName,
699 resourceName,
700 partitions,
701 stateModelRef,
702 IdealStateModeProperty.AUTO.toString(),
703 0);
704 }
705
706 @Override
707 public void addResource(String clusterName,
708 String resourceName,
709 int partitions,
710 String stateModelRef,
711 String idealStateMode)
712 {
713 addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode, 0);
714 }
715
716 @Override
717 public void addResource(String clusterName,
718 String resourceName,
719 IdealState idealstate)
720 {
721 String stateModelRef = idealstate.getStateModelDefRef();
722 String stateModelDefPath =
723 PropertyPathConfig.getPath(PropertyType.STATEMODELDEFS,
724 clusterName,
725 stateModelRef);
726 if (!_zkClient.exists(stateModelDefPath))
727 {
728 throw new HelixException("State model " + stateModelRef
729 + " not found in the cluster STATEMODELDEFS path");
730 }
731
732 String idealStatePath = HelixUtil.getIdealStatePath(clusterName);
733 String resourceIdealStatePath = idealStatePath + "/" + resourceName;
734 if (_zkClient.exists(resourceIdealStatePath))
735 {
736 throw new HelixException("Skip the operation. Resource ideal state directory already exists:"
737 + resourceIdealStatePath);
738 }
739
740 ZKUtil.createChildren(_zkClient, idealStatePath, idealstate.getRecord());
741 }
742
743 @Override
744 public void addResource(String clusterName,
745 String resourceName,
746 int partitions,
747 String stateModelRef,
748 String idealStateMode,
749 int bucketSize)
750 {
751 addResource(clusterName, resourceName, partitions, stateModelRef, idealStateMode,
752 bucketSize, -1);
753
754 }
755
756 @Override
757 public void addResource(String clusterName, String resourceName,
758 int partitions, String stateModelRef, String idealStateMode,
759 int bucketSize, int maxPartitionsPerInstance)
760 {
761 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
762 {
763 throw new HelixException("cluster " + clusterName + " is not setup yet");
764 }
765
766 IdealStateModeProperty mode = IdealStateModeProperty.AUTO;
767 try
768 {
769 mode = IdealStateModeProperty.valueOf(idealStateMode);
770 }
771 catch (Exception e)
772 {
773 logger.error("", e);
774 }
775 IdealState idealState = new IdealState(resourceName);
776 idealState.setNumPartitions(partitions);
777 idealState.setStateModelDefRef(stateModelRef);
778 idealState.setIdealStateMode(mode.toString());
779 idealState.setReplicas("" + 0);
780 idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
781 if(maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE)
782 {
783 idealState.setMaxPartitionsPerInstance(maxPartitionsPerInstance);
784 }
785 if (bucketSize > 0)
786 {
787 idealState.setBucketSize(bucketSize);
788 }
789 addResource(clusterName, resourceName, idealState);
790 }
791
792 @Override
793 public List<String> getClusters()
794 {
795 List<String> zkToplevelPathes = _zkClient.getChildren("/");
796 List<String> result = new ArrayList<String>();
797 for (String pathName : zkToplevelPathes)
798 {
799 if (ZKUtil.isClusterSetup(pathName, _zkClient))
800 {
801 result.add(pathName);
802 }
803 }
804 return result;
805 }
806
807 @Override
808 public List<String> getResourcesInCluster(String clusterName)
809 {
810 return _zkClient.getChildren(HelixUtil.getIdealStatePath(clusterName));
811 }
812
813 @Override
814 public IdealState getResourceIdealState(String clusterName, String resourceName)
815 {
816 ZKHelixDataAccessor accessor =
817 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
818 Builder keyBuilder = accessor.keyBuilder();
819
820 return accessor.getProperty(keyBuilder.idealStates(resourceName));
821 }
822
823 @Override
824 public void setResourceIdealState(String clusterName,
825 String resourceName,
826 IdealState idealState)
827 {
828 ZKHelixDataAccessor accessor =
829 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
830 Builder keyBuilder = accessor.keyBuilder();
831
832 accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
833 }
834
835 @Override
836 public ExternalView getResourceExternalView(String clusterName, String resourceName)
837 {
838 ZKHelixDataAccessor accessor =
839 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
840 Builder keyBuilder = accessor.keyBuilder();
841 return accessor.getProperty(keyBuilder.externalView(resourceName));
842 }
843
844 @Override
845 public void addStateModelDef(String clusterName,
846 String stateModelDef,
847 StateModelDefinition stateModel)
848 {
849 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
850 {
851 throw new HelixException("cluster " + clusterName + " is not setup yet");
852 }
853 String stateModelDefPath = HelixUtil.getStateModelDefinitionPath(clusterName);
854 String stateModelPath = stateModelDefPath + "/" + stateModelDef;
855 if (_zkClient.exists(stateModelPath))
856 {
857 logger.warn("Skip the operation.State Model directory exists:" + stateModelPath);
858 throw new HelixException("State model path " + stateModelPath + " already exists.");
859 }
860
861 ZKHelixDataAccessor accessor =
862 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
863 Builder keyBuilder = accessor.keyBuilder();
864 accessor.setProperty(keyBuilder.stateModelDef(stateModel.getId()), stateModel);
865 }
866
867 @Override
868 public void dropResource(String clusterName, String resourceName)
869 {
870 ZKHelixDataAccessor accessor =
871 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
872 Builder keyBuilder = accessor.keyBuilder();
873
874 accessor.removeProperty(keyBuilder.idealStates(resourceName));
875 accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
876 }
877
878 @Override
879 public List<String> getStateModelDefs(String clusterName)
880 {
881 return _zkClient.getChildren(HelixUtil.getStateModelDefinitionPath(clusterName));
882 }
883
884 @Override
885 public StateModelDefinition getStateModelDef(String clusterName, String stateModelName)
886 {
887 ZKHelixDataAccessor accessor =
888 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
889 Builder keyBuilder = accessor.keyBuilder();
890
891 return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
892 }
893
894 @Override
895 public void addStat(String clusterName, final String statName)
896 {
897 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
898 {
899 throw new HelixException("cluster " + clusterName + " is not setup yet");
900 }
901
902 String persistentStatsPath =
903 PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
904 ZkBaseDataAccessor<ZNRecord> baseAccessor =
905 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
906
907 baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
908 {
909
910 @Override
911 public ZNRecord update(ZNRecord statsRec)
912 {
913 if (statsRec == null)
914 {
915
916 statsRec = new ZNRecord(PersistentStats.nodeName);
917 }
918
919 Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
920 Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
921 for (String newStat : newStatMap.keySet())
922 {
923 if (!currStatMap.containsKey(newStat))
924 {
925 currStatMap.put(newStat, newStatMap.get(newStat));
926 }
927 }
928 statsRec.setMapFields(currStatMap);
929
930 return statsRec;
931 }
932 }, AccessOption.PERSISTENT);
933 }
934
935 @Override
936 public void addAlert(final String clusterName, final String alertName)
937 {
938 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
939 {
940 throw new HelixException("cluster " + clusterName + " is not setup yet");
941 }
942
943 ZkBaseDataAccessor<ZNRecord> baseAccessor =
944 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
945
946 String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
947
948 baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
949 {
950
951 @Override
952 public ZNRecord update(ZNRecord alertsRec)
953 {
954 if (alertsRec == null)
955 {
956
957 alertsRec = new ZNRecord(Alerts.nodeName);
958
959 }
960
961 Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
962 StringBuilder newStatName = new StringBuilder();
963 Map<String, String> newAlertMap = new HashMap<String, String>();
964
965
966 AlertsHolder.parseAlert(alertName, newStatName, newAlertMap);
967
968
969 addStat(clusterName, newStatName.toString());
970
971
972 currAlertMap.put(alertName, newAlertMap);
973
974 alertsRec.setMapFields(currAlertMap);
975
976 return alertsRec;
977 }
978 }, AccessOption.PERSISTENT);
979 }
980
981 @Override
982 public void dropCluster(String clusterName)
983 {
984 logger.info("Deleting cluster " + clusterName);
985 ZKHelixDataAccessor accessor =
986 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
987 Builder keyBuilder = accessor.keyBuilder();
988
989 String root = "/" + clusterName;
990 if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0)
991 {
992 throw new HelixException("There are still live instances in the cluster, shut them down first.");
993 }
994
995 if (accessor.getProperty(keyBuilder.controllerLeader()) != null)
996 {
997 throw new HelixException("There are still LEADER in the cluster, shut them down first.");
998 }
999
1000 _zkClient.deleteRecursive(root);
1001 }
1002
1003 @Override
1004 public void dropStat(String clusterName, final String statName)
1005 {
1006 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1007 {
1008 throw new HelixException("cluster " + clusterName + " is not setup yet");
1009 }
1010
1011 String persistentStatsPath =
1012 PropertyPathConfig.getPath(PropertyType.PERSISTENTSTATS, clusterName);
1013 ZkBaseDataAccessor<ZNRecord> baseAccessor =
1014 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1015
1016 baseAccessor.update(persistentStatsPath, new DataUpdater<ZNRecord>()
1017 {
1018
1019 @Override
1020 public ZNRecord update(ZNRecord statsRec)
1021 {
1022 if (statsRec == null)
1023 {
1024 throw new HelixException("No stats record in ZK, nothing to drop");
1025 }
1026
1027 Map<String, Map<String, String>> currStatMap = statsRec.getMapFields();
1028 Map<String, Map<String, String>> newStatMap = StatsHolder.parseStat(statName);
1029
1030
1031 for (String newStat : newStatMap.keySet())
1032 {
1033 if (currStatMap.containsKey(newStat))
1034 {
1035 currStatMap.remove(newStat);
1036 }
1037 }
1038 statsRec.setMapFields(currStatMap);
1039
1040 return statsRec;
1041 }
1042 }, AccessOption.PERSISTENT);
1043 }
1044
1045 @Override
1046 public void dropAlert(String clusterName, final String alertName)
1047 {
1048 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1049 {
1050 throw new HelixException("cluster " + clusterName + " is not setup yet");
1051 }
1052
1053 String alertsPath = PropertyPathConfig.getPath(PropertyType.ALERTS, clusterName);
1054
1055 ZkBaseDataAccessor<ZNRecord> baseAccessor =
1056 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1057
1058 if (!baseAccessor.exists(alertsPath, 0))
1059 {
1060 throw new HelixException("No alerts node in ZK, nothing to drop");
1061 }
1062
1063 baseAccessor.update(alertsPath, new DataUpdater<ZNRecord>()
1064 {
1065 @Override
1066 public ZNRecord update(ZNRecord alertsRec)
1067 {
1068 if (alertsRec == null)
1069 {
1070 throw new HelixException("No alerts record in ZK, nothing to drop");
1071 }
1072
1073 Map<String, Map<String, String>> currAlertMap = alertsRec.getMapFields();
1074 currAlertMap.remove(alertName);
1075 alertsRec.setMapFields(currAlertMap);
1076
1077 return alertsRec;
1078 }
1079 }, AccessOption.PERSISTENT);
1080 }
1081
1082 @Override
1083 public void addClusterToGrandCluster(String clusterName, String grandCluster)
1084 {
1085 if (!ZKUtil.isClusterSetup(grandCluster, _zkClient))
1086 {
1087 throw new HelixException("Grand cluster " + grandCluster + " is not setup yet");
1088 }
1089
1090 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1091 {
1092 throw new HelixException("Cluster " + clusterName + " is not setup yet");
1093 }
1094
1095 IdealState idealState = new IdealState(clusterName);
1096
1097 idealState.setNumPartitions(1);
1098 idealState.setStateModelDefRef("LeaderStandby");
1099
1100 List<String> controllers = getInstancesInCluster(grandCluster);
1101 if (controllers.size() == 0)
1102 {
1103 throw new HelixException("Grand cluster " + grandCluster + " has no instances");
1104 }
1105 idealState.setReplicas(Integer.toString(controllers.size()));
1106 Collections.shuffle(controllers);
1107 idealState.getRecord().setListField(clusterName, controllers);
1108 idealState.setPartitionState(clusterName, controllers.get(0), "LEADER");
1109 for (int i = 1; i < controllers.size(); i++)
1110 {
1111 idealState.setPartitionState(clusterName, controllers.get(i), "STANDBY");
1112 }
1113
1114 ZKHelixDataAccessor accessor =
1115 new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1116 Builder keyBuilder = accessor.keyBuilder();
1117
1118 accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
1119 }
1120
1121 @Override
1122 public void setConfig(HelixConfigScope scope, Map<String, String> properties)
1123 {
1124 _configAccessor.set(scope, properties);
1125 }
1126
1127 @Override
1128 public Map<String, String> getConfig(HelixConfigScope scope, List<String> keys)
1129 {
1130 return _configAccessor.get(scope, keys);
1131 }
1132
1133 @Override
1134 public List<String> getConfigKeys(HelixConfigScope scope)
1135 {
1136 return _configAccessor.getKeys(scope);
1137 }
1138
1139 @Override
1140 public void removeConfig(HelixConfigScope scope, List<String> keys)
1141 {
1142 _configAccessor.remove(scope, keys);
1143 }
1144
1145 @Override
1146 public void rebalance(String clusterName, String resourceName, int replica)
1147 {
1148 rebalance(clusterName, resourceName, replica, resourceName, "");
1149 }
1150
1151 @Override
1152 public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, String group)
1153 {
1154 List<String> instanceNames = new LinkedList<String>();
1155 if(keyPrefix == null || keyPrefix.length() == 0)
1156 {
1157 keyPrefix = resourceName;
1158 }
1159 if(group != null && group.length() > 0)
1160 {
1161 instanceNames = getInstancesInClusterWithTag(clusterName, group);
1162 }
1163 if(instanceNames.size() == 0)
1164 {
1165 logger.info("No tags found for resource " + resourceName + ", use all instances");
1166 instanceNames = getInstancesInCluster(clusterName);
1167 group = "";
1168 }
1169 else
1170 {
1171 logger.info("Found instances with tag for " + resourceName + " " + instanceNames);
1172 }
1173 rebalance(clusterName, resourceName, replica, keyPrefix, instanceNames, group);
1174 }
1175
1176 @Override
1177 public void rebalance(String clusterName, String resourceName, int replica, List<String> instances)
1178 {
1179 rebalance(clusterName, resourceName, replica, resourceName, instances, "");
1180 }
1181
1182
1183 void rebalance(String clusterName,
1184 String resourceName,
1185 int replica,
1186 String keyPrefix,
1187 List<String> instanceNames,
1188 String groupId)
1189 {
1190
1191 Collections.sort(instanceNames);
1192
1193 IdealState idealState = getResourceIdealState(clusterName, resourceName);
1194 if (idealState == null)
1195 {
1196 throw new HelixException("Resource: " + resourceName + " has NOT been added yet");
1197 }
1198
1199 if(groupId != null && groupId.length() > 0)
1200 {
1201 idealState.setInstanceGroupTag(groupId);
1202 }
1203 idealState.setReplicas(Integer.toString(replica));
1204 int partitions = idealState.getNumPartitions();
1205 String stateModelName = idealState.getStateModelDefRef();
1206 StateModelDefinition stateModDef = getStateModelDef(clusterName, stateModelName);
1207
1208 if (stateModDef == null)
1209 {
1210 throw new HelixException("cannot find state model: " + stateModelName);
1211 }
1212
1213
1214 List<String> statePriorityList = stateModDef.getStatesPriorityList();
1215
1216 String masterStateValue = null;
1217 String slaveStateValue = null;
1218 replica--;
1219
1220 for (String state : statePriorityList)
1221 {
1222 String count = stateModDef.getNumInstancesPerState(state);
1223 if (count.equals("1"))
1224 {
1225 if (masterStateValue != null)
1226 {
1227 throw new HelixException("Invalid or unsupported state model definition");
1228 }
1229 masterStateValue = state;
1230 }
1231 else if (count.equalsIgnoreCase("R"))
1232 {
1233 if (slaveStateValue != null)
1234 {
1235 throw new HelixException("Invalid or unsupported state model definition");
1236 }
1237 slaveStateValue = state;
1238 }
1239 else if (count.equalsIgnoreCase("N"))
1240 {
1241 if (!(masterStateValue == null && slaveStateValue == null))
1242 {
1243 throw new HelixException("Invalid or unsupported state model definition");
1244 }
1245 replica = instanceNames.size() - 1;
1246 masterStateValue = slaveStateValue = state;
1247 }
1248 }
1249 if (masterStateValue == null && slaveStateValue == null)
1250 {
1251 throw new HelixException("Invalid or unsupported state model definition");
1252 }
1253
1254 if (masterStateValue == null)
1255 {
1256 masterStateValue = slaveStateValue;
1257 }
1258 if (idealState.getIdealStateMode() != IdealStateModeProperty.AUTO_REBALANCE)
1259 {
1260 ZNRecord newIdealState =
1261 DefaultIdealStateCalculator.calculateIdealState(instanceNames,
1262 partitions,
1263 replica,
1264 keyPrefix,
1265 masterStateValue,
1266 slaveStateValue);
1267
1268
1269 if (idealState.getIdealStateMode() == IdealStateModeProperty.AUTO)
1270 {
1271 idealState.getRecord().setListFields(newIdealState.getListFields());
1272 idealState.getRecord().setMapFields(newIdealState.getMapFields());
1273 }
1274 if (idealState.getIdealStateMode() == IdealStateModeProperty.CUSTOMIZED)
1275 {
1276 idealState.getRecord().setMapFields(newIdealState.getMapFields());
1277 }
1278 }
1279 else
1280 {
1281 for (int i = 0; i < partitions; i++)
1282 {
1283 String partitionName = keyPrefix + "_" + i;
1284 idealState.getRecord().setMapField(partitionName, new HashMap<String, String>());
1285 idealState.getRecord().setListField(partitionName, new ArrayList<String>());
1286 }
1287 }
1288 setResourceIdealState(clusterName, resourceName, idealState);
1289 }
1290
1291 @Override
1292 public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException
1293 {
1294 ZNRecord idealStateRecord =
1295 (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile)));
1296 if (idealStateRecord.getId() == null
1297 || !idealStateRecord.getId().equals(resourceName))
1298 {
1299 throw new IllegalArgumentException("ideal state must have same id as resource name");
1300 }
1301 setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord));
1302 }
1303
1304 private static byte[] readFile(String filePath) throws IOException
1305 {
1306 File file = new File(filePath);
1307
1308 int size = (int) file.length();
1309 byte[] bytes = new byte[size];
1310 DataInputStream dis = new DataInputStream(new FileInputStream(file));
1311 int read = 0;
1312 int numRead = 0;
1313 while (read < bytes.length
1314 && (numRead = dis.read(bytes, read, bytes.length - read)) >= 0)
1315 {
1316 read = read + numRead;
1317 }
1318 return bytes;
1319 }
1320
1321 public void addStateModelDef(String clusterName,
1322 String stateModelDefName,
1323 String stateModelDefFile) throws IOException
1324 {
1325 ZNRecord record =
1326 (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile)));
1327 if (record == null || record.getId() == null
1328 || !record.getId().equals(stateModelDefName))
1329 {
1330 throw new IllegalArgumentException("state model definition must have same id as state model def name");
1331 }
1332 addStateModelDef(clusterName, stateModelDefName, new StateModelDefinition(record));
1333
1334 }
1335
1336 @Override
1337 public void setConstraint(String clusterName,
1338 final ConstraintType constraintType,
1339 final String constraintId,
1340 final ConstraintItem constraintItem)
1341 {
1342 ZkBaseDataAccessor<ZNRecord> baseAccessor =
1343 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1344
1345 Builder keyBuilder = new Builder(clusterName);
1346 String path = keyBuilder.constraint(constraintType.toString()).getPath();
1347
1348 baseAccessor.update(path, new DataUpdater<ZNRecord>()
1349 {
1350 @Override
1351 public ZNRecord update(ZNRecord currentData)
1352 {
1353 ClusterConstraints constraints = currentData == null?
1354 new ClusterConstraints(constraintType) : new ClusterConstraints(currentData);
1355
1356 constraints.addConstraintItem(constraintId, constraintItem);
1357 return constraints.getRecord();
1358 }
1359 }, AccessOption.PERSISTENT);
1360 }
1361
1362 @Override
1363 public void removeConstraint(String clusterName,
1364 final ConstraintType constraintType,
1365 final String constraintId)
1366 {
1367 ZkBaseDataAccessor<ZNRecord> baseAccessor =
1368 new ZkBaseDataAccessor<ZNRecord>(_zkClient);
1369
1370 Builder keyBuilder = new Builder(clusterName);
1371 String path = keyBuilder.constraint(constraintType.toString()).getPath();
1372
1373 baseAccessor.update(path, new DataUpdater<ZNRecord>()
1374 {
1375 @Override
1376 public ZNRecord update(ZNRecord currentData)
1377 {
1378 if (currentData != null) {
1379 ClusterConstraints constraints = new ClusterConstraints(currentData);
1380
1381 constraints.removeConstraintItem(constraintId);
1382 return constraints.getRecord();
1383 }
1384 return null;
1385 }
1386 }, AccessOption.PERSISTENT);
1387 }
1388
1389 @Override
1390 public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType)
1391 {
1392 HelixDataAccessor accessor =
1393 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1394
1395 Builder keyBuilder = new Builder(clusterName);
1396 return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
1397 }
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407 @Override
1408 public void rebalance(String clusterName,
1409 IdealState currentIdealState,
1410 List<String> instanceNames)
1411 {
1412 Set<String> activeInstances = new HashSet<String>();
1413 for (String partition : currentIdealState.getPartitionSet())
1414 {
1415 activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
1416 }
1417 instanceNames.removeAll(activeInstances);
1418 Map<String, Object> previousIdealState = RebalanceUtil.buildInternalIdealState(currentIdealState);
1419
1420 Map<String, Object> balancedRecord =
1421 DefaultIdealStateCalculator.calculateNextIdealState(instanceNames,
1422 previousIdealState);
1423 StateModelDefinition stateModDef =
1424 this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef());
1425
1426 if (stateModDef == null)
1427 {
1428 throw new HelixException("cannot find state model: " + currentIdealState.getStateModelDefRef());
1429 }
1430 String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
1431
1432 ZNRecord newIdealStateRecord =
1433 DefaultIdealStateCalculator.convertToZNRecord(balancedRecord,
1434 currentIdealState.getResourceName(),
1435 states[0],
1436 states[1]);
1437 Set<String> partitionSet = new HashSet<String>();
1438 partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
1439 partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
1440
1441 Map<String, String> reversePartitionIndex =
1442 (Map<String, String>) balancedRecord.get("reversePartitionIndex");
1443 for (String partition : partitionSet)
1444 {
1445 if (reversePartitionIndex.containsKey(partition))
1446 {
1447 String originPartitionName = reversePartitionIndex.get(partition);
1448 if (partition.equals(originPartitionName))
1449 {
1450 continue;
1451 }
1452 newIdealStateRecord.getMapFields()
1453 .put(originPartitionName,
1454 newIdealStateRecord.getMapField(partition));
1455 newIdealStateRecord.getMapFields().remove(partition);
1456
1457 newIdealStateRecord.getListFields()
1458 .put(originPartitionName,
1459 newIdealStateRecord.getListField(partition));
1460 newIdealStateRecord.getListFields().remove(partition);
1461 }
1462 }
1463
1464 newIdealStateRecord.getSimpleFields()
1465 .putAll(currentIdealState.getRecord().getSimpleFields());
1466 IdealState newIdealState = new IdealState(newIdealStateRecord);
1467 setResourceIdealState(clusterName, newIdealStateRecord.getId(), newIdealState);
1468 }
1469
1470 @Override
1471 public void addInstanceTag(String clusterName, String instanceName, String tag)
1472 {
1473 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1474 {
1475 throw new HelixException("cluster " + clusterName + " is not setup yet");
1476 }
1477
1478 if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
1479 {
1480 throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet");
1481 }
1482 ZKHelixDataAccessor accessor =
1483 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1484 Builder keyBuilder = accessor.keyBuilder();
1485
1486 InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
1487 config.addTag(tag);
1488 accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
1489 }
1490
1491 @Override
1492 public void removeInstanceTag(String clusterName, String instanceName,
1493 String tag)
1494 {
1495 if (!ZKUtil.isClusterSetup(clusterName, _zkClient))
1496 {
1497 throw new HelixException("cluster " + clusterName + " is not setup yet");
1498 }
1499
1500 if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT))
1501 {
1502 throw new HelixException("cluster " + clusterName + " instance "+instanceName +" is not setup yet");
1503 }
1504 ZKHelixDataAccessor accessor =
1505 new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
1506 Builder keyBuilder = accessor.keyBuilder();
1507
1508 InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
1509 config.removeTag(tag);
1510 accessor.setProperty(keyBuilder.instanceConfig(instanceName), config);
1511 }
1512
1513 public void close()
1514 {
1515 if(_zkClient!=null)
1516 {
1517 _zkClient.close();
1518 }
1519 }
1520
1521 }