View Javadoc

1   package org.apache.helix.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.List;
25  
26  import org.apache.helix.HelixDataAccessor;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.HelixConstants.ChangeType;
29  import org.apache.helix.HelixConstants.StateModelToken;
30  import org.apache.helix.PropertyKey.Builder;
31  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32  import org.apache.helix.manager.zk.ZNRecordSerializer;
33  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34  import org.apache.helix.manager.zk.ZkClient;
35  import org.apache.helix.model.IdealState;
36  import org.apache.helix.model.IdealState.IdealStateModeProperty;
37  import org.apache.log4j.Logger;
38  
39  
40  /**
41   * This provides the ability for users to run a custom code in exactly one
42   * process using a LeaderStandBy state model. <br/>
43   * A typical use case is when one uses CUSTOMIZED ideal state mode where the
44   * assignment of partition to nodes needs to change dynamically as the nodes go
45   * online/offline.<br/>
46   * <code>
47   * HelixCustomCodeRunner runner = new HelixCustomCodeRunner(manager,ZK_ADDR);
48   * runner
49   *  .invoke(_callback)
50   *  .on(ChangeType.LIVE_INSTANCE, ChangeType.IdealState)
51   *  .usingLeaderStandbyModel("someUniqueId")
52   *  .run()
53   * </code>
54   *
55   *
56   */
57  public class HelixCustomCodeRunner
58  {
59    private static final String LEADER_STANDBY = "LeaderStandby";
60    private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
61    private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
62  
63    private CustomCodeCallbackHandler _callback;
64    private List<ChangeType> _notificationTypes;
65    private String _resourceName;
66    private final HelixManager _manager;
67    private final String _zkAddr;
68    private GenericLeaderStandbyStateModelFactory _stateModelFty;
69  
70    /**
71     * Constructs a HelixCustomCodeRunner that will run exactly in one place
72     *
73     * @param manager
74     * @param zkAddr
75     */
76    public HelixCustomCodeRunner(HelixManager manager, String zkAddr)
77    {
78      _manager = manager;
79      _zkAddr = zkAddr;
80    }
81  
82    /**
83     * callback to invoke when there is a change in cluster state specified by on(
84     * notificationTypes) This callback must be idempotent which means they should
85     * not depend on what changed instead simply read the cluster data and act on
86     * it.
87     *
88     * @param callback
89     * @return
90     */
91    public HelixCustomCodeRunner invoke(CustomCodeCallbackHandler callback)
92    {
93      _callback = callback;
94      return this;
95    }
96  
97    /**
98     * ChangeTypes interested in, ParticipantLeaderCallback.callback method will
99     * be invoked on the
100    *
101    * @param notificationTypes
102    * @return
103    */
104   public HelixCustomCodeRunner on(ChangeType... notificationTypes)
105   {
106     _notificationTypes = Arrays.asList(notificationTypes);
107     return this;
108   }
109 
110   public HelixCustomCodeRunner usingLeaderStandbyModel(String id)
111   {
112     _resourceName = PARTICIPANT_LEADER + "_" + id;
113     return this;
114   }
115 
116   /**
117    * This method will be invoked when there is a change in any subscribed
118    * notificationTypes
119    *
120    * @throws Exception
121    */
122   public void start() throws Exception
123   {
124     if (_callback == null || _notificationTypes == null || _notificationTypes.size() == 0
125         || _resourceName == null)
126     {
127       throw new IllegalArgumentException("Require callback | notificationTypes | resourceName");
128     }
129 
130     LOG.info("Register participantLeader on " + _notificationTypes + " using " + _resourceName);
131 
132     _stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
133 
134     StateMachineEngine stateMach = _manager.getStateMachineEngine();
135     stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
136     ZkClient zkClient = null;
137     try
138     {
139       // manually add ideal state for participant leader using LeaderStandby
140       // model
141 
142       zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
143       zkClient.setZkSerializer(new ZNRecordSerializer());
144       HelixDataAccessor accessor = new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor(zkClient));
145       Builder keyBuilder = accessor.keyBuilder();
146 
147       IdealState idealState = new IdealState(_resourceName);
148       idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
149       idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
150       idealState.setNumPartitions(1);
151       idealState.setStateModelDefRef(LEADER_STANDBY);
152       idealState.setStateModelFactoryName(_resourceName);
153       List<String> prefList = new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE
154           .toString()));
155       idealState.getRecord().setListField(_resourceName + "_0", prefList);
156 
157       List<String> idealStates = accessor.getChildNames(keyBuilder.idealStates());
158       while (idealStates == null || !idealStates.contains(_resourceName))
159       {
160         accessor.setProperty(keyBuilder.idealStates(_resourceName), idealState);
161         idealStates = accessor.getChildNames(keyBuilder.idealStates());
162       }
163 
164       LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:"
165           + idealState);
166     } finally
167     {
168       if (zkClient != null && zkClient.getConnection() != null)
169 
170       {
171         zkClient.close();
172       }
173     }
174 
175   }
176 
177   /**
178    * Stop customer code runner
179    */
180   public void stop()
181   {
182     LOG.info("Removing stateModelFactory for " + _resourceName);
183     _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
184         _resourceName);
185   }
186 }