1 package org.apache.helix.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23
24 import org.apache.helix.ConfigChangeListener;
25 import org.apache.helix.ExternalViewChangeListener;
26 import org.apache.helix.HelixDataAccessor;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.LiveInstanceChangeListener;
29 import org.apache.helix.NotificationContext;
30 import org.apache.helix.NotificationContext.Type;
31 import org.apache.helix.PropertyKey.Builder;
32 import org.apache.helix.model.CurrentState;
33 import org.apache.helix.model.ExternalView;
34 import org.apache.helix.model.InstanceConfig;
35 import org.apache.helix.model.LiveInstance;
36 import org.apache.log4j.Logger;
37
38
39 public class CustomCodeInvoker implements
40 LiveInstanceChangeListener,
41 ConfigChangeListener,
42 ExternalViewChangeListener
43 {
44 private static Logger LOG = Logger.getLogger(CustomCodeInvoker.class);
45 private final CustomCodeCallbackHandler _callback;
46 private final String _partitionKey;
47
48 public CustomCodeInvoker(CustomCodeCallbackHandler callback, String partitionKey)
49 {
50 _callback = callback;
51 _partitionKey = partitionKey;
52 }
53
54 private void callParticipantCode(NotificationContext context)
55 {
56
57 if (context.getType() == Type.INIT || context.getType() == Type.CALLBACK)
58 {
59
60
61
62 if (context.getType() == Type.CALLBACK)
63 {
64 HelixManager manager = context.getManager();
65
66 HelixDataAccessor accessor = manager.getHelixDataAccessor();
67 Builder keyBuilder = accessor.keyBuilder();
68
69 String instance = manager.getInstanceName();
70 String sessionId = manager.getSessionId();
71
72
73 String resourceName = _partitionKey.substring(0, _partitionKey.lastIndexOf('_'));
74
75 CurrentState curState =
76 accessor.getProperty(keyBuilder.currentState(instance,
77 sessionId,
78 resourceName));
79 if (curState == null)
80 {
81 return;
82 }
83
84 String state = curState.getState(_partitionKey);
85 if (state == null || !state.equalsIgnoreCase("LEADER"))
86 {
87 return;
88 }
89 }
90
91 try
92 {
93 _callback.onCallback(context);
94 }
95 catch (Exception e)
96 {
97 LOG.error("Error invoking callback:" + _callback, e);
98 }
99 }
100 }
101
102 @Override
103 public void onLiveInstanceChange(List<LiveInstance> liveInstances,
104 NotificationContext changeContext)
105 {
106 LOG.info("onLiveInstanceChange() invoked");
107 callParticipantCode(changeContext);
108 }
109
110 @Override
111 public void onConfigChange(List<InstanceConfig> configs,
112 NotificationContext changeContext)
113 {
114 LOG.info("onConfigChange() invoked");
115 callParticipantCode(changeContext);
116 }
117
118 @Override
119 public void onExternalViewChange(List<ExternalView> externalViewList,
120 NotificationContext changeContext)
121 {
122 LOG.info("onExternalViewChange() invoked");
123 callParticipantCode(changeContext);
124 }
125
126 }