View Javadoc

1   package org.apache.helix.mock.controller;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.concurrent.CountDownLatch;
23  
24  import org.apache.helix.HelixManager;
25  import org.apache.helix.HelixManagerFactory;
26  import org.apache.helix.InstanceType;
27  import org.apache.helix.ZkHelixTestManager;
28  import org.apache.helix.controller.HelixControllerMain;
29  import org.apache.helix.participant.DistClusterControllerStateModelFactory;
30  import org.apache.helix.participant.StateMachineEngine;
31  import org.apache.log4j.Logger;
32  
33  
34  public class ClusterController extends Thread
35  {
36    private static Logger        LOG                      =
37                                                              Logger.getLogger(ClusterController.class);
38  
39    private final CountDownLatch _startCountDown          = new CountDownLatch(1);
40    private final CountDownLatch _stopCountDown           = new CountDownLatch(1);
41    private final CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
42    private final String         _controllerMode;
43    private final String         _zkAddr;
44  
45    private ZkHelixTestManager   _manager;
46  
47    public ClusterController(String clusterName, String controllerName, String zkAddr) throws Exception
48    {
49      this(clusterName, controllerName, zkAddr, HelixControllerMain.STANDALONE.toString());
50    }
51  
52    public ClusterController(String clusterName,
53                             String controllerName,
54                             String zkAddr,
55                             String controllerMode) throws Exception
56    {
57      _controllerMode = controllerMode;
58      _zkAddr = zkAddr;
59  
60      if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
61      {
62        _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER, zkAddr);
63      }
64      else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
65      {
66        _manager = new ZkHelixTestManager(clusterName, controllerName, InstanceType.CONTROLLER_PARTICIPANT, zkAddr);
67      }
68      else
69      {
70        throw new IllegalArgumentException("Controller mode: " + controllerMode
71            + " NOT recoginized");
72      }
73    }
74  
75    public ZkHelixTestManager getManager()
76    {
77      return _manager;
78    }
79  
80    public void syncStop()
81    {
82      if (_manager == null)
83      {
84        LOG.warn("manager already stopped");
85        return;
86      }
87  
88      _stopCountDown.countDown();
89      try
90      {
91        _waitStopFinishCountDown.await();
92      }
93      catch (InterruptedException e)
94      {
95        // TODO Auto-generated catch block
96        e.printStackTrace();
97      }
98    }
99  
100   public void syncStart()
101   {
102     // TODO: prevent start multiple times
103     
104     super.start();
105     try
106     {
107       _startCountDown.await();
108     }
109     catch (InterruptedException e)
110     {
111       // TODO Auto-generated catch block
112       e.printStackTrace();
113     }
114   }
115 
116   @Override
117   public void run()
118   {
119     try
120     {
121       try
122       {
123         if (_controllerMode.equals(HelixControllerMain.STANDALONE.toString()))
124         {
125           _manager.connect();
126         }
127         else if (_controllerMode.equals(HelixControllerMain.DISTRIBUTED.toString()))
128         {
129           DistClusterControllerStateModelFactory stateModelFactory =
130               new DistClusterControllerStateModelFactory(_zkAddr);
131 
132           StateMachineEngine stateMach = _manager.getStateMachineEngine();
133           stateMach.registerStateModelFactory("LeaderStandby", stateModelFactory);
134           _manager.connect();
135         }
136       }
137       catch (Exception e)
138       {
139         // TODO Auto-generated catch block
140         e.printStackTrace();
141       }
142       finally
143       {
144         _startCountDown.countDown();
145         _stopCountDown.await();
146       }
147     }
148     catch (Exception e)
149     {
150       // TODO Auto-generated catch block
151       e.printStackTrace();
152     }
153     finally
154     {
155       synchronized (_manager)
156       {
157         _manager.disconnect();
158         _manager = null;
159       }
160       _waitStopFinishCountDown.countDown();
161     }
162   }
163 }