1 package org.apache.helix.participant;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.management.ManagementFactory;
23
24 import org.apache.helix.ControllerChangeListener;
25 import org.apache.helix.HelixDataAccessor;
26 import org.apache.helix.HelixManager;
27 import org.apache.helix.HelixManagerFactory;
28 import org.apache.helix.InstanceType;
29 import org.apache.helix.NotificationContext;
30 import org.apache.helix.PropertyKey.Builder;
31 import org.apache.helix.PropertyType;
32 import org.apache.helix.controller.GenericHelixController;
33 import org.apache.helix.controller.HelixControllerMain;
34 import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
35 import org.apache.helix.model.LeaderHistory;
36 import org.apache.helix.model.LiveInstance;
37 import org.apache.log4j.Logger;
38
39
40 public class DistClusterControllerElection implements ControllerChangeListener {
41 private static Logger LOG = Logger.getLogger(DistClusterControllerElection.class);
42 private final String _zkAddr;
43 private final GenericHelixController _controller = new GenericHelixController();
44 private HelixManager _leader;
45
46 public DistClusterControllerElection(String zkAddr) {
47 _zkAddr = zkAddr;
48 }
49
50
51
52
53
54
55 @Override
56 public synchronized void onControllerChange(NotificationContext changeContext) {
57 HelixManager manager = changeContext.getManager();
58 if (manager == null) {
59 LOG.error("missing attributes in changeContext. requires HelixManager");
60 return;
61 }
62
63 InstanceType type = manager.getInstanceType();
64 if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT) {
65 LOG.error("fail to become controller because incorrect instanceType (was "
66 + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
67 return;
68 }
69
70 try {
71 if (changeContext.getType().equals(NotificationContext.Type.INIT)
72 || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
73
74 HelixDataAccessor accessor = manager.getHelixDataAccessor();
75 Builder keyBuilder = accessor.keyBuilder();
76
77 while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
78 boolean success = tryUpdateController(manager);
79 if (success) {
80 updateHistory(manager);
81 if (type == InstanceType.CONTROLLER) {
82 HelixControllerMain.addListenersToController(manager, _controller);
83 manager.startTimerTasks();
84 } else if (type == InstanceType.CONTROLLER_PARTICIPANT) {
85 String clusterName = manager.getClusterName();
86 String controllerName = manager.getInstanceName();
87 _leader = HelixManagerFactory.getZKHelixManager(clusterName,
88 controllerName, InstanceType.CONTROLLER, _zkAddr);
89
90 _leader.connect();
91 _leader.startTimerTasks();
92 HelixControllerMain.addListenersToController(_leader, _controller);
93 }
94
95 }
96 }
97 } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
98
99 if (_leader != null) {
100 _leader.disconnect();
101 }
102 }
103
104 } catch (Exception e) {
105 LOG.error("Exception when trying to become leader", e);
106 }
107 }
108
109 private boolean tryUpdateController(HelixManager manager) {
110
111 HelixDataAccessor accessor = manager.getHelixDataAccessor();
112 Builder keyBuilder = accessor.keyBuilder();
113
114 LiveInstance leader = new LiveInstance(manager.getInstanceName());
115 try {
116 leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
117
118
119 leader.setSessionId(manager.getSessionId());
120 leader.setHelixVersion(manager.getVersion());
121 if (ZKPropertyTransferServer.getInstance() != null) {
122 String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance()
123 .getWebserviceUrl();
124 if (zkPropertyTransferServiceUrl != null) {
125 leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
126 }
127 } else {
128 LOG.warn("ZKPropertyTransferServer instnace is null");
129 }
130 boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
131 if (success) {
132 return true;
133 } else {
134 LOG.info("Unable to become leader probably because some other controller becames the leader");
135 }
136 } catch (Exception e) {
137 LOG.error(
138 "Exception when trying to updating leader record in cluster:"
139 + manager.getClusterName()
140 + ". Need to check again whether leader node has been created or not",
141 e);
142 }
143
144 leader = accessor.getProperty(keyBuilder.controllerLeader());
145 if (leader != null) {
146 String leaderSessionId = leader.getSessionId();
147 LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
148 + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
149
150 if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
151 return true;
152 }
153 }
154 return false;
155 }
156
157 private void updateHistory(HelixManager manager) {
158 HelixDataAccessor accessor = manager.getHelixDataAccessor();
159 Builder keyBuilder = accessor.keyBuilder();
160
161 LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
162 if (history == null) {
163 history = new LeaderHistory(PropertyType.HISTORY.toString());
164 }
165 history.updateHistory(manager.getClusterName(), manager.getInstanceName());
166 accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
167 }
168 }