1 package org.apache.helix.tools;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31
32 import org.I0Itec.zkclient.IZkChildListener;
33 import org.I0Itec.zkclient.IZkDataListener;
34 import org.apache.commons.cli.CommandLine;
35 import org.apache.commons.cli.CommandLineParser;
36 import org.apache.commons.cli.GnuParser;
37 import org.apache.commons.cli.HelpFormatter;
38 import org.apache.commons.cli.Option;
39 import org.apache.commons.cli.OptionBuilder;
40 import org.apache.commons.cli.Options;
41 import org.apache.commons.cli.ParseException;
42 import org.apache.helix.HelixDataAccessor;
43 import org.apache.helix.HelixDefinedState;
44 import org.apache.helix.PropertyPathConfig;
45 import org.apache.helix.PropertyType;
46 import org.apache.helix.ZNRecord;
47 import org.apache.helix.PropertyKey.Builder;
48 import org.apache.helix.controller.pipeline.Stage;
49 import org.apache.helix.controller.pipeline.StageContext;
50 import org.apache.helix.controller.stages.AttributeName;
51 import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
52 import org.apache.helix.controller.stages.BestPossibleStateOutput;
53 import org.apache.helix.controller.stages.ClusterDataCache;
54 import org.apache.helix.controller.stages.ClusterEvent;
55 import org.apache.helix.controller.stages.CurrentStateComputationStage;
56 import org.apache.helix.controller.stages.ResourceComputationStage;
57 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
58 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
59 import org.apache.helix.manager.zk.ZkClient;
60 import org.apache.helix.model.ExternalView;
61 import org.apache.helix.model.IdealState;
62 import org.apache.helix.model.Partition;
63 import org.apache.helix.participant.statemachine.StateModel;
64 import org.apache.helix.participant.statemachine.StateModelFactory;
65 import org.apache.helix.store.PropertyJsonComparator;
66 import org.apache.helix.store.PropertyJsonSerializer;
67 import org.apache.helix.util.ZKClientPool;
68 import org.apache.log4j.Logger;
69
70
71 public class ClusterStateVerifier
72 {
73 public static String cluster = "cluster";
74 public static String zkServerAddress = "zkSvr";
75 public static String help = "help";
76 public static String timeout = "timeout";
77 public static String period = "period";
78
79 private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
80
81 public interface Verifier
82 {
83 boolean verify();
84 }
85
86 public interface ZkVerifier extends Verifier
87 {
88 ZkClient getZkClient();
89
90 String getClusterName();
91 }
92
93 static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener
94 {
95 final CountDownLatch _countDown;
96 final ZkClient _zkClient;
97 final Verifier _verifier;
98
99 public ExtViewVeriferZkListener(CountDownLatch countDown,
100 ZkClient zkClient,
101 ZkVerifier verifier)
102 {
103 _countDown = countDown;
104 _zkClient = zkClient;
105 _verifier = verifier;
106 }
107
108 @Override
109 public void handleDataChange(String dataPath, Object data) throws Exception
110 {
111 boolean result = _verifier.verify();
112 if (result == true)
113 {
114 _countDown.countDown();
115 }
116 }
117
118 @Override
119 public void handleDataDeleted(String dataPath) throws Exception
120 {
121
122
123 }
124
125 @Override
126 public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception
127 {
128 for (String child : currentChilds)
129 {
130 String childPath =
131 parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
132 _zkClient.subscribeDataChanges(childPath, this);
133 }
134
135 boolean result = _verifier.verify();
136 if (result == true)
137 {
138 _countDown.countDown();
139 }
140 }
141
142 }
143
144
145
146
147 public static class BestPossAndExtViewZkVerifier implements ZkVerifier
148 {
149 private final String zkAddr;
150 private final String clusterName;
151 private final Map<String, Map<String, String>> errStates;
152 private final ZkClient zkClient;
153
154 public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName)
155 {
156 this(zkAddr, clusterName, null);
157 }
158
159 public BestPossAndExtViewZkVerifier(String zkAddr,
160 String clusterName,
161 Map<String, Map<String, String>> errStates)
162 {
163 if (zkAddr == null || clusterName == null)
164 {
165 throw new IllegalArgumentException("requires zkAddr|clusterName");
166 }
167 this.zkAddr = zkAddr;
168 this.clusterName = clusterName;
169 this.errStates = errStates;
170 this.zkClient = ZKClientPool.getZkClient(zkAddr);
171 }
172
173 @Override
174 public boolean verify()
175 {
176 try
177 {
178 HelixDataAccessor accessor =
179 new ZKHelixDataAccessor(clusterName,
180 new ZkBaseDataAccessor<ZNRecord>(zkClient));
181
182 return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates);
183 }
184 catch (Exception e)
185 {
186 LOG.error("exception in verification", e);
187 }
188 return false;
189 }
190
191 @Override
192 public ZkClient getZkClient()
193 {
194 return zkClient;
195 }
196
197 @Override
198 public String getClusterName()
199 {
200 return clusterName;
201 }
202
203 @Override
204 public String toString()
205 {
206 String verifierName = getClass().getName();
207 verifierName =
208 verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
209 return verifierName + "(" + clusterName + "@" + zkAddr + ")";
210 }
211 }
212
213 public static class MasterNbInExtViewVerifier implements ZkVerifier
214 {
215 private final String zkAddr;
216 private final String clusterName;
217 private final ZkClient zkClient;
218
219 public MasterNbInExtViewVerifier(String zkAddr, String clusterName)
220 {
221 if (zkAddr == null || clusterName == null)
222 {
223 throw new IllegalArgumentException("requires zkAddr|clusterName");
224 }
225 this.zkAddr = zkAddr;
226 this.clusterName = clusterName;
227 this.zkClient = ZKClientPool.getZkClient(zkAddr);
228 }
229
230 @Override
231 public boolean verify()
232 {
233 try
234 {
235 ZKHelixDataAccessor accessor =
236 new ZKHelixDataAccessor(clusterName,
237 new ZkBaseDataAccessor<ZNRecord>(zkClient));
238
239 return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
240 }
241 catch (Exception e)
242 {
243 LOG.error("exception in verification", e);
244 }
245 return false;
246 }
247
248 @Override
249 public ZkClient getZkClient()
250 {
251 return zkClient;
252 }
253
254 @Override
255 public String getClusterName()
256 {
257 return clusterName;
258 }
259
260 }
261
262 static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
263 Map<String, Map<String, String>> errStates)
264 {
265 try
266 {
267 Builder keyBuilder = accessor.keyBuilder();
268
269 ClusterDataCache cache = new ClusterDataCache();
270 cache.refresh(accessor);
271
272 Map<String, IdealState> idealStates = cache.getIdealStates();
273 if (idealStates == null)
274 {
275
276 idealStates = Collections.emptyMap();
277 }
278
279 Map<String, ExternalView> extViews =
280 accessor.getChildValuesMap(keyBuilder.externalViews());
281 if (extViews == null)
282 {
283 extViews = Collections.emptyMap();
284 }
285
286
287
288 for (String resource : extViews.keySet())
289 {
290 if (!idealStates.containsKey(resource))
291 {
292 idealStates.put(resource, new IdealState(resource));
293 }
294 }
295
296
297 BestPossibleStateOutput bestPossOutput =
298 ClusterStateVerifier.calcBestPossState(cache);
299 Map<String, Map<Partition, Map<String, String>>> bestPossStateMap = bestPossOutput.getStateMap();
300
301
302 if (errStates != null)
303 {
304 for (String resourceName : errStates.keySet())
305 {
306 Map<String, String> partErrStates = errStates.get(resourceName);
307 for (String partitionName : partErrStates.keySet())
308 {
309 String instanceName = partErrStates.get(partitionName);
310
311 if (!bestPossStateMap.containsKey(resourceName)) {
312 bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
313 }
314 Partition partition = new Partition(partitionName);
315 if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
316 bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
317 }
318 bestPossStateMap.get(resourceName).get(partition).put(instanceName, HelixDefinedState.ERROR.toString());
319 }
320 }
321 }
322
323
324
325
326 for (String resourceName : idealStates.keySet())
327 {
328 ExternalView extView = extViews.get(resourceName);
329 if (extView == null)
330 {
331 LOG.info("externalView for " + resourceName + " is not available");
332 return false;
333 }
334
335
336 Map<Partition, Map<String, String>> bpStateMap =
337 bestPossOutput.getResourceMap(resourceName);
338 Iterator<Entry<Partition, Map<String, String>>> iter =
339 bpStateMap.entrySet().iterator();
340 while (iter.hasNext())
341 {
342 Map.Entry<Partition, Map<String, String>> entry = iter.next();
343 Map<String, String> instanceStateMap = entry.getValue();
344 if (instanceStateMap.isEmpty())
345 {
346 iter.remove();
347 } else
348 {
349
350 Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
351 while (insIter.hasNext())
352 {
353 Map.Entry<String, String> insEntry = insIter.next();
354 String state = insEntry.getValue();
355 if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString()))
356 {
357 insIter.remove();
358 }
359 }
360 }
361 }
362
363
364
365
366 int extViewSize = extView.getRecord().getMapFields().size();
367 int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
368 if (extViewSize != bestPossStateSize)
369 {
370 LOG.info("exterView size (" + extViewSize
371 + ") is different from bestPossState size (" + bestPossStateSize
372 + ") for resource: " + resourceName);
373
374
375
376
377
378
379
380 return false;
381 }
382
383
384 for (String partition : extView.getRecord().getMapFields().keySet())
385 {
386 Map<String, String> evInstanceStateMap =
387 extView.getRecord().getMapField(partition);
388 Map<String, String> bpInstanceStateMap =
389 bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
390
391 boolean result =
392 ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
393 bpInstanceStateMap);
394 if (result == false)
395 {
396 LOG.info("externalView is different from bestPossibleState for partition:"
397 + partition);
398
399
400
401 return false;
402 }
403 }
404 }
405 return true;
406 }
407 catch (Exception e)
408 {
409 LOG.error("exception in verification", e);
410 return false;
411 }
412
413 }
414
415 static boolean verifyMasterNbInExtView(HelixDataAccessor accessor)
416 {
417 Builder keyBuilder = accessor.keyBuilder();
418
419 Map<String, IdealState> idealStates =
420 accessor.getChildValuesMap(keyBuilder.idealStates());
421 if (idealStates == null || idealStates.size() == 0)
422 {
423 LOG.info("No resource idealState");
424 return true;
425 }
426
427 Map<String, ExternalView> extViews =
428 accessor.getChildValuesMap(keyBuilder.externalViews());
429 if (extViews == null || extViews.size() < idealStates.size())
430 {
431 LOG.info("No externalViews | externalView.size() < idealState.size()");
432 return false;
433 }
434
435 for (String resource : extViews.keySet())
436 {
437 int partitions = idealStates.get(resource).getNumPartitions();
438 Map<String, Map<String, String>> instanceStateMap =
439 extViews.get(resource).getRecord().getMapFields();
440 if (instanceStateMap.size() < partitions)
441 {
442 LOG.info("Number of externalViews (" + instanceStateMap.size()
443 + ") < partitions (" + partitions + ")");
444 return false;
445 }
446
447 for (String partition : instanceStateMap.keySet())
448 {
449 boolean foundMaster = false;
450 for (String instance : instanceStateMap.get(partition).keySet())
451 {
452 if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER"))
453 {
454 foundMaster = true;
455 break;
456 }
457 }
458 if (!foundMaster)
459 {
460 LOG.info("No MASTER for partition: " + partition);
461 return false;
462 }
463 }
464 }
465 return true;
466 }
467
468 static void runStage(ClusterEvent event, Stage stage) throws Exception
469 {
470 StageContext context = new StageContext();
471 stage.init(context);
472 stage.preProcess();
473 stage.process(event);
474 stage.postProcess();
475 }
476
477
478
479
480
481
482
483
484
485
486 static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception
487 {
488 ClusterEvent event = new ClusterEvent("sampleEvent");
489 event.addAttribute("ClusterDataCache", cache);
490
491 ResourceComputationStage rcState = new ResourceComputationStage();
492 CurrentStateComputationStage csStage = new CurrentStateComputationStage();
493 BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
494
495 runStage(event, rcState);
496 runStage(event, csStage);
497 runStage(event, bpStage);
498
499 BestPossibleStateOutput output =
500 event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
501
502
503 return output;
504 }
505
506 public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2)
507 {
508 boolean isEqual = true;
509 if (map1 == null && map2 == null)
510 {
511
512 }
513 else if (map1 == null && map2 != null)
514 {
515 if (!map2.isEmpty())
516 {
517 isEqual = false;
518 }
519 }
520 else if (map1 != null && map2 == null)
521 {
522 if (!map1.isEmpty())
523 {
524 isEqual = false;
525 }
526 }
527 else
528 {
529
530 if (map1.size() != map2.size())
531 {
532 isEqual = false;
533 }
534
535 for (K key : map1.keySet())
536 {
537 if (!map1.get(key).equals(map2.get(key)))
538 {
539 LOG.debug("different value for key: " + key + "(map1: " + map1.get(key)
540 + ", map2: " + map2.get(key) + ")");
541 isEqual = false;
542 break;
543 }
544 }
545 }
546 return isEqual;
547 }
548
549 public static boolean verifyByPolling(Verifier verifier)
550 {
551 return verifyByPolling(verifier, 30 * 1000);
552 }
553
554 public static boolean verifyByPolling(Verifier verifier, long timeout)
555 {
556 return verifyByPolling(verifier, timeout, 1000);
557 }
558
559 public static boolean verifyByPolling(Verifier verifier, long timeout, long period)
560 {
561 long startTime = System.currentTimeMillis();
562 boolean result = false;
563 try
564 {
565 long curTime;
566 do
567 {
568 Thread.sleep(period);
569 result = verifier.verify();
570 if (result == true)
571 {
572 break;
573 }
574 curTime = System.currentTimeMillis();
575 }
576 while (curTime <= startTime + timeout);
577 return result;
578 }
579 catch (Exception e)
580 {
581
582 e.printStackTrace();
583 }
584 finally
585 {
586 long endTime = System.currentTimeMillis();
587
588
589 System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
590 + "ms to verify");
591
592 }
593 return false;
594 }
595
596 public static boolean verifyByZkCallback(ZkVerifier verifier)
597 {
598 return verifyByZkCallback(verifier, 30000);
599 }
600
601 public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout)
602 {
603 long startTime = System.currentTimeMillis();
604 CountDownLatch countDown = new CountDownLatch(1);
605 ZkClient zkClient = verifier.getZkClient();
606 String clusterName = verifier.getClusterName();
607
608
609
610 zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
611
612 ExtViewVeriferZkListener listener =
613 new ExtViewVeriferZkListener(countDown, zkClient, verifier);
614
615 String extViewPath =
616 PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, clusterName);
617 zkClient.subscribeChildChanges(extViewPath, listener);
618 for (String child : zkClient.getChildren(extViewPath))
619 {
620 String childPath =
621 extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
622 zkClient.subscribeDataChanges(childPath, listener);
623 }
624
625
626 boolean result = verifier.verify();
627 if (result == false)
628 {
629 try
630 {
631 result = countDown.await(timeout, TimeUnit.MILLISECONDS);
632 if (result == false)
633 {
634
635 result = verifier.verify();
636 }
637 }
638 catch (Exception e)
639 {
640
641 e.printStackTrace();
642 }
643 }
644
645
646 zkClient.unsubscribeChildChanges(extViewPath, listener);
647 for (String child : zkClient.getChildren(extViewPath))
648 {
649 String childPath =
650 extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
651 zkClient.unsubscribeDataChanges(childPath, listener);
652 }
653
654 long endTime = System.currentTimeMillis();
655
656 zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
657
658 System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
659
660 return result;
661 }
662
663 @SuppressWarnings("static-access")
664 private static Options constructCommandLineOptions()
665 {
666 Option helpOption =
667 OptionBuilder.withLongOpt(help)
668 .withDescription("Prints command-line options info")
669 .create();
670
671 Option zkServerOption =
672 OptionBuilder.withLongOpt(zkServerAddress)
673 .withDescription("Provide zookeeper address")
674 .create();
675 zkServerOption.setArgs(1);
676 zkServerOption.setRequired(true);
677 zkServerOption.setArgName("ZookeeperServerAddress(Required)");
678
679 Option clusterOption =
680 OptionBuilder.withLongOpt(cluster)
681 .withDescription("Provide cluster name")
682 .create();
683 clusterOption.setArgs(1);
684 clusterOption.setRequired(true);
685 clusterOption.setArgName("Cluster name (Required)");
686
687 Option timeoutOption =
688 OptionBuilder.withLongOpt(timeout)
689 .withDescription("Timeout value for verification")
690 .create();
691 timeoutOption.setArgs(1);
692 timeoutOption.setArgName("Timeout value (Optional), default=30s");
693
694 Option sleepIntervalOption =
695 OptionBuilder.withLongOpt(period)
696 .withDescription("Polling period for verification")
697 .create();
698 sleepIntervalOption.setArgs(1);
699 sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
700
701 Options options = new Options();
702 options.addOption(helpOption);
703 options.addOption(zkServerOption);
704 options.addOption(clusterOption);
705 options.addOption(timeoutOption);
706 options.addOption(sleepIntervalOption);
707
708 return options;
709 }
710
711 public static void printUsage(Options cliOptions)
712 {
713 HelpFormatter helpFormatter = new HelpFormatter();
714 helpFormatter.setWidth(1000);
715 helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
716 }
717
718 public static CommandLine processCommandLineArgs(String[] cliArgs)
719 {
720 CommandLineParser cliParser = new GnuParser();
721 Options cliOptions = constructCommandLineOptions();
722
723
724 try
725 {
726 return cliParser.parse(cliOptions, cliArgs);
727 }
728 catch (ParseException pe)
729 {
730 System.err.println("CommandLineClient: failed to parse command-line options: "
731 + pe.toString());
732 printUsage(cliOptions);
733 System.exit(1);
734 }
735 return null;
736 }
737
738 public static boolean verifyState(String[] args)
739 {
740
741 String clusterName = "storage-cluster";
742 String zkServer = "localhost:2181";
743 long timeoutValue = 0;
744 long periodValue = 1000;
745
746 if (args.length > 0)
747 {
748 CommandLine cmd = processCommandLineArgs(args);
749 zkServer = cmd.getOptionValue(zkServerAddress);
750 clusterName = cmd.getOptionValue(cluster);
751 String timeoutStr = cmd.getOptionValue(timeout);
752 String periodStr = cmd.getOptionValue(period);
753 if (timeoutStr != null)
754 {
755 try
756 {
757 timeoutValue = Long.parseLong(timeoutStr);
758 }
759 catch (Exception e)
760 {
761 System.err.println("Exception in converting " + timeoutStr
762 + " to long. Use default (0)");
763 }
764 }
765
766 if (periodStr != null)
767 {
768 try
769 {
770 periodValue = Long.parseLong(periodStr);
771 }
772 catch (Exception e)
773 {
774 System.err.println("Exception in converting " + periodStr
775 + " to long. Use default (1000)");
776 }
777 }
778
779 }
780
781
782
783
784 return verifyByZkCallback(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
785 timeoutValue);
786 }
787
788 public static void main(String[] args)
789 {
790 boolean result = verifyState(args);
791 System.out.println(result ? "Successful" : "failed");
792 System.exit(1);
793 }
794
795 }