View Javadoc

1   package org.apache.helix.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  
24  import org.apache.helix.ConfigChangeListener;
25  import org.apache.helix.ExternalViewChangeListener;
26  import org.apache.helix.HelixDataAccessor;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.LiveInstanceChangeListener;
29  import org.apache.helix.NotificationContext;
30  import org.apache.helix.NotificationContext.Type;
31  import org.apache.helix.PropertyKey.Builder;
32  import org.apache.helix.model.CurrentState;
33  import org.apache.helix.model.ExternalView;
34  import org.apache.helix.model.InstanceConfig;
35  import org.apache.helix.model.LiveInstance;
36  import org.apache.log4j.Logger;
37  
38  
39  public class CustomCodeInvoker implements
40      LiveInstanceChangeListener,
41      ConfigChangeListener,
42      ExternalViewChangeListener
43  {
44    private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
45    private final CustomCodeCallbackHandler _callback;
46    private final String _partitionKey;
47  
48    public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey)
49    {
50      _callback = callback;
51      _partitionKey = partitionKey;
52    }
53  
54    private void callParticipantCode(NotificationContext context)
55    {
56      // System.out.println("callback invoked. type:" + context.getType().toString());
57      if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK)
58      {
59        // since ZkClient.unsubscribe() does not immediately remove listeners
60        // from zk, it is possible that two listeners exist when leadership transfers
61        // therefore, double check to make sure only one participant invokes the code
62        if (context.getType() == Type.CALLBACK)
63        {
64          HelixManager manager = context.getManager();
65          // DataAccessor accessor = manager.getDataAccessor();
66          HelixDataAccessor accessor = manager.getHelixDataAccessor();
67          Builder keyBuilder = accessor.keyBuilder();
68  
69          String instance = manager.getInstanceName();
70          String sessionId = manager.getSessionId();
71  
72          // get resource name from partition key: "PARTICIPANT_LEADER_XXX_0"
73          String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
74  
75          CurrentState curState =
76              accessor.getProperty(keyBuilder.currentState(instance,
77                                                           sessionId,
78                                                           resourceName));
79          if (curState == null)
80          {
81            return;
82          }
83  
84          String state = curState.getState(_partitionKey);
85          if (state == null || !state.equalsIgnoreCase("LEADER"))
86          {
87            return;
88          }
89        }
90  
91        try
92        {
93          _callback.onCallback(context);
94        }
95        catch (Exception e)
96        {
97          LOG.error("Error invoking callback:" + _callback, e);
98        }
99      }
100   }
101 
102   @Override
103   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
104                                    NotificationContext changeContext)
105   {
106     LOG.info("onLiveInstanceChange() invoked");
107     callParticipantCode(changeContext);
108   }
109 
110   @Override
111   public void onConfigChange(List<InstanceConfig> configs,
112                              NotificationContext changeContext)
113   {
114     LOG.info("onConfigChange() invoked");
115     callParticipantCode(changeContext);
116   }
117 
118   @Override
119   public void onExternalViewChange(List<ExternalView> externalViewList,
120                                    NotificationContext changeContext)
121   {
122     LOG.info("onExternalViewChange() invoked");
123     callParticipantCode(changeContext);
124   }
125 
126 }