1 package org.apache.helix.mock.controller;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.concurrent.CountDownLatch;
23
24 import org.apache.helix.HelixManager;
25 import org.apache.helix.HelixManagerFactory;
26 import org.apache.helix.InstanceType;
27 import org.apache.helix.ZkHelixTestManager;
28 import org.apache.helix.controller.HelixControllerMain;
29 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
30 import org.apache.helix.participant.StateMachineEngine;
31 import org.apache.log4j.Logger;
32
33
34 public class ClusterController extends Thread
35 {
36 private static Logger LOG =
37 Logger.getLogger(ClusterController.class);
38
39 private final CountDownLatch _startCountDown = new CountDownLatch(1);
40 private final CountDownLatch _stopCountDown = new CountDownLatch(1);
41 private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
42 private final String _controllerMode;
43 private final String _zkAddr;
44
45 private ZkHelixTestManager _manager;
46
47 public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
48 {
49 this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
50 }
51
52 public ClusterController(String clusterName,
53 String controllerName,
54 String zkAddr,
55 String controllerMode) throws Exception
56 {
57 _controllerMode = controllerMode;
58 _zkAddr = zkAddr;
59
60 if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
61 {
62 _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
63 }
64 else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
65 {
66 _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
67 }
68 else
69 {
70 throw new IllegalArgumentException("Controller mode: " + controllerMode
71 + " NOT recoginized");
72 }
73 }
74
75 public ZkHelixTestManager getManager()
76 {
77 return _manager;
78 }
79
80 public void syncStop()
81 {
82 if (_manager == null)
83 {
84 LOG.warn("manager already stopped");
85 return;
86 }
87
88 _stopCountDown.countDown();
89 try
90 {
91 _waitStopFinishCountDown.await();
92 }
93 catch (InterruptedException e)
94 {
95
96 e.printStackTrace();
97 }
98 }
99
100 public void syncStart()
101 {
102
103
104 super.start();
105 try
106 {
107 _startCountDown.await();
108 }
109 catch (InterruptedException e)
110 {
111
112 e.printStackTrace();
113 }
114 }
115
116 @Override
117 public void run()
118 {
119 try
120 {
121 try
122 {
123 if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
124 {
125 _manager.connect();
126 }
127 else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
128 {
129 DistClusterControllerStateModelFactory stateModelFactory =
130 new DistClusterControllerStateModelFactory(_zkAddr);
131
132 StateMachineEngine stateMach = _manager.getStateMachineEngine();
133 stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
134 _manager.connect();
135 }
136 }
137 catch (Exception e)
138 {
139
140 e.printStackTrace();
141 }
142 finally
143 {
144 _startCountDown.countDown();
145 _stopCountDown.await();
146 }
147 }
148 catch (Exception e)
149 {
150
151 e.printStackTrace();
152 }
153 finally
154 {
155 synchronized (_manager)
156 {
157 _manager.disconnect();
158 _manager = null;
159 }
160 _waitStopFinishCountDown.countDown();
161 }
162 }
163 }