1 package org.apache.helix.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.List;
25
26 import org.apache.helix.HelixDataAccessor;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.HelixConstants.ChangeType;
29 import org.apache.helix.HelixConstants.StateModelToken;
30 import org.apache.helix.PropertyKey.Builder;
31 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32 import org.apache.helix.manager.zk.ZNRecordSerializer;
33 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34 import org.apache.helix.manager.zk.ZkClient;
35 import org.apache.helix.model.IdealState;
36 import org.apache.helix.model.IdealState.IdealStateModeProperty;
37 import org.apache.log4j.Logger;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class HelixCustomCodeRunner
58 {
59 private static final String LEADER_STANDBY = "LeaderStandby";
60 private static Logger LOG = Logger.getLogger(HelixCustomCodeRunner.class);
61 private static String PARTICIPANT_LEADER = "PARTICIPANT_LEADER";
62
63 private CustomCodeCallbackHandler _callback;
64 private List<ChangeType> _notificationTypes;
65 private String _resourceName;
66 private final HelixManager _manager;
67 private final String _zkAddr;
68 private GenericLeaderStandbyStateModelFactory _stateModelFty;
69
70
71
72
73
74
75
76 public HelixCustomCodeRunner(HelixManager manager, String zkAddr)
77 {
78 _manager = manager;
79 _zkAddr = zkAddr;
80 }
81
82
83
84
85
86
87
88
89
90
91 public HelixCustomCodeRunner invoke(CustomCodeCallbackHandler callback)
92 {
93 _callback = callback;
94 return this;
95 }
96
97
98
99
100
101
102
103
104 public HelixCustomCodeRunner on(ChangeType... notificationTypes)
105 {
106 _notificationTypes = Arrays.asList(notificationTypes);
107 return this;
108 }
109
110 public HelixCustomCodeRunner usingLeaderStandbyModel(String id)
111 {
112 _resourceName = PARTICIPANT_LEADER + "_" + id;
113 return this;
114 }
115
116
117
118
119
120
121
122 public void start() throws Exception
123 {
124 if (_callback == null || _notificationTypes == null || _notificationTypes.size() == 0
125 || _resourceName == null)
126 {
127 throw new IllegalArgumentException("Require callback | notificationTypes | resourceName");
128 }
129
130 LOG.info("Register participantLeader on " + _notificationTypes + " using " + _resourceName);
131
132 _stateModelFty = new GenericLeaderStandbyStateModelFactory(_callback, _notificationTypes);
133
134 StateMachineEngine stateMach = _manager.getStateMachineEngine();
135 stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
136 ZkClient zkClient = null;
137 try
138 {
139
140
141
142 zkClient = new ZkClient(_zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
143 zkClient.setZkSerializer(new ZNRecordSerializer());
144 HelixDataAccessor accessor = new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor(zkClient));
145 Builder keyBuilder = accessor.keyBuilder();
146
147 IdealState idealState = new IdealState(_resourceName);
148 idealState.setIdealStateMode(IdealStateModeProperty.AUTO.toString());
149 idealState.setReplicas(StateModelToken.ANY_LIVEINSTANCE.toString());
150 idealState.setNumPartitions(1);
151 idealState.setStateModelDefRef(LEADER_STANDBY);
152 idealState.setStateModelFactoryName(_resourceName);
153 List<String> prefList = new ArrayList<String>(Arrays.asList(StateModelToken.ANY_LIVEINSTANCE
154 .toString()));
155 idealState.getRecord().setListField(_resourceName + "_0", prefList);
156
157 List<String> idealStates = accessor.getChildNames(keyBuilder.idealStates());
158 while (idealStates == null || !idealStates.contains(_resourceName))
159 {
160 accessor.setProperty(keyBuilder.idealStates(_resourceName), idealState);
161 idealStates = accessor.getChildNames(keyBuilder.idealStates());
162 }
163
164 LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:"
165 + idealState);
166 } finally
167 {
168 if (zkClient != null && zkClient.getConnection() != null)
169
170 {
171 zkClient.close();
172 }
173 }
174
175 }
176
177
178
179
180 public void stop()
181 {
182 LOG.info("Removing stateModelFactory for " + _resourceName);
183 _manager.getStateMachineEngine().removeStateModelFactory(LEADER_STANDBY, _stateModelFty,
184 _resourceName);
185 }
186 }