View Javadoc

1   package org.apache.helix.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.helix.HelixManager;
23  import org.apache.helix.HelixManagerFactory;
24  import org.apache.helix.InstanceType;
25  import org.apache.helix.NotificationContext;
26  import org.apache.helix.model.Message;
27  import org.apache.helix.participant.statemachine.StateModel;
28  import org.apache.helix.participant.statemachine.StateModelInfo;
29  import org.apache.helix.participant.statemachine.StateModelParser;
30  import org.apache.helix.participant.statemachine.StateTransitionError;
31  import org.apache.helix.participant.statemachine.Transition;
32  import org.apache.log4j.Logger;
33  
34  
35  
36  @StateModelInfo(initialState = "OFFLINE", states = { "LEADER", "STANDBY" })
37  public class DistClusterControllerStateModel extends StateModel
38  {
39    private static Logger logger = Logger.getLogger(DistClusterControllerStateModel.class);
40    private HelixManager _controller = null;
41    private final String _zkAddr;
42  
43    public DistClusterControllerStateModel(String zkAddr)
44    {
45      StateModelParser parser = new StateModelParser();
46      _currentState = parser.getInitialState(DistClusterControllerStateModel.class);
47      _zkAddr = zkAddr;
48    }
49  
50    @Transition(to="STANDBY",from="OFFLINE")
51    public void onBecomeStandbyFromOffline(Message message, NotificationContext context)
52    {
53      logger.info("Becoming standby from offline");
54    }
55  
56    @Transition(to="LEADER",from="STANDBY")
57    public void onBecomeLeaderFromStandby(Message message, NotificationContext context)
58    throws Exception
59    {
60      String clusterName = message.getPartitionName();
61      String controllerName = message.getTgtName();
62  
63      logger.info(controllerName + " becomes leader from standby for " + clusterName);
64      // System.out.println(controllerName + " becomes leader from standby for " + clusterName);
65  
66      if (_controller == null)
67      {
68        _controller = HelixManagerFactory
69            .getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
70        _controller.connect();
71        _controller.startTimerTasks();
72      }
73      else
74      {
75        logger.error("controller already exists:" + _controller.getInstanceName()
76                     + " for " + clusterName);
77      }
78  
79    }
80  
81    @Transition(to="STANDBY",from="LEADER")
82    public void onBecomeStandbyFromLeader(Message message, NotificationContext context)
83    {
84      String clusterName = message.getPartitionName();
85      String controllerName = message.getTgtName();
86  
87      logger.info(controllerName + " becoming standby from leader for " + clusterName);
88  
89      if (_controller != null)
90      {
91        _controller.disconnect();
92        _controller = null;
93      }
94      else
95      {
96        logger.error("No controller exists for " + clusterName);
97      }
98    }
99  
100   @Transition(to="OFFLINE",from="STANDBY")
101   public void onBecomeOfflineFromStandby(Message message, NotificationContext context)
102   {
103     String clusterName = message.getPartitionName();
104     String controllerName = message.getTgtName();
105 
106     logger.info(controllerName + " becoming offline from standby for cluster:" + clusterName);
107 
108   }
109 
110   @Transition(to="DROPPED",from="OFFLINE")
111   public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
112   {
113     logger.info("Becoming dropped from offline");
114   }
115 
116   @Transition(to="OFFLINE",from="DROPPED")
117   public void onBecomeOfflineFromDropped(Message message, NotificationContext context)
118   {
119     logger.info("Becoming offline from dropped");
120   }
121 
122 
123   @Override
124   public void rollbackOnError(Message message, NotificationContext context,
125                               StateTransitionError error)
126   {
127     String clusterName = message.getPartitionName();
128     String controllerName = message.getTgtName();
129 
130     logger.error(controllerName + " rollbacks on error for " + clusterName);
131 
132     if (_controller != null)
133     {
134       _controller.disconnect();
135       _controller = null;
136     }
137 
138   }
139 
140   @Override
141   public void reset()
142   {
143     if (_controller != null)
144     {
145 //      System.out.println("disconnect " + _controller.getInstanceName()
146 //                         + "(" + _controller.getInstanceType()
147 //                         + ") from " + _controller.getClusterName());
148       _controller.disconnect();
149       _controller = null;
150     }
151 
152   }
153 }