View Javadoc

1   package org.apache.helix.participant;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.lang.management.ManagementFactory;
23  
24  import org.apache.helix.ControllerChangeListener;
25  import org.apache.helix.HelixDataAccessor;
26  import org.apache.helix.HelixManager;
27  import org.apache.helix.HelixManagerFactory;
28  import org.apache.helix.InstanceType;
29  import org.apache.helix.NotificationContext;
30  import org.apache.helix.PropertyKey.Builder;
31  import org.apache.helix.PropertyType;
32  import org.apache.helix.controller.GenericHelixController;
33  import org.apache.helix.controller.HelixControllerMain;
34  import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
35  import org.apache.helix.model.LeaderHistory;
36  import org.apache.helix.model.LiveInstance;
37  import org.apache.log4j.Logger;
38  
39  // TODO: merge with GenericHelixController
40  public class DistClusterControllerElection implements ControllerChangeListener {
41      private static Logger LOG = Logger.getLogger(DistClusterControllerElection.class);
42      private final String _zkAddr;
43      private final GenericHelixController _controller = new GenericHelixController();
44      private HelixManager _leader;
45  
46      public DistClusterControllerElection(String zkAddr) {
47  	_zkAddr = zkAddr;
48      }
49  
50      /**
51       * may be accessed by multiple threads: zk-client thread and
52       * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
53       * HelixMaangerMain class statically
54       */
55      @Override
56      public synchronized void onControllerChange(NotificationContext changeContext) {
57  	HelixManager manager = changeContext.getManager();
58  	if (manager == null) {
59  	    LOG.error("missing attributes in changeContext. requires HelixManager");
60  	    return;
61  	}
62  
63  	InstanceType type = manager.getInstanceType();
64  	if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT) {
65  	    LOG.error("fail to become controller because incorrect instanceType (was "
66  		    + type.toString() + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
67  	    return;
68  	}
69  
70  	try {
71  	    if (changeContext.getType().equals(NotificationContext.Type.INIT)
72  		    || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
73  		// DataAccessor dataAccessor = manager.getDataAccessor();
74  		HelixDataAccessor accessor = manager.getHelixDataAccessor();
75  		Builder keyBuilder = accessor.keyBuilder();
76  
77  		while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
78  		    boolean success = tryUpdateController(manager);
79  		    if (success) {
80  			updateHistory(manager);
81  			if (type == InstanceType.CONTROLLER) {
82  			    HelixControllerMain.addListenersToController(manager, _controller);
83  			    manager.startTimerTasks();
84  			} else if (type == InstanceType.CONTROLLER_PARTICIPANT) {
85  			    String clusterName = manager.getClusterName();
86  			    String controllerName = manager.getInstanceName();
87  			    _leader = HelixManagerFactory.getZKHelixManager(clusterName,
88  				    controllerName, InstanceType.CONTROLLER, _zkAddr);
89  
90  			    _leader.connect();
91  			    _leader.startTimerTasks();
92  			    HelixControllerMain.addListenersToController(_leader, _controller);
93  			}
94  
95  		    }
96  		}
97  	    } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
98  
99  		if (_leader != null) {
100 		    _leader.disconnect();
101 		}
102 	    }
103 
104 	} catch (Exception e) {
105 	    LOG.error("Exception when trying to become leader", e);
106 	}
107     }
108 
109     private boolean tryUpdateController(HelixManager manager) {
110 	// DataAccessor dataAccessor = manager.getDataAccessor();
111 	HelixDataAccessor accessor = manager.getHelixDataAccessor();
112 	Builder keyBuilder = accessor.keyBuilder();
113 
114 	LiveInstance leader = new LiveInstance(manager.getInstanceName());
115 	try {
116 	    leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
117 	    // TODO: this session id is not the leader's session id in
118 	    // distributed mode
119 	    leader.setSessionId(manager.getSessionId());
120 	    leader.setHelixVersion(manager.getVersion());
121 	    if (ZKPropertyTransferServer.getInstance() != null) {
122 		String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance()
123 		        .getWebserviceUrl();
124 		if (zkPropertyTransferServiceUrl != null) {
125 		    leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
126 		}
127 	    } else {
128 		LOG.warn("ZKPropertyTransferServer instnace is null");
129 	    }
130 	    boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
131 	    if (success) {
132 		return true;
133 	    } else {
134 		LOG.info("Unable to become leader probably because some other controller becames the leader");
135 	    }
136 	} catch (Exception e) {
137 	    LOG.error(
138 		    "Exception when trying to updating leader record in cluster:"
139 		            + manager.getClusterName()
140 		            + ". Need to check again whether leader node has been created or not",
141 		    e);
142 	}
143 
144 	leader = accessor.getProperty(keyBuilder.controllerLeader());
145 	if (leader != null) {
146 	    String leaderSessionId = leader.getSessionId();
147 	    LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
148 		    + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
149 
150 	    if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
151 		return true;
152 	    }
153 	}
154 	return false;
155     }
156 
157     private void updateHistory(HelixManager manager) {
158 	HelixDataAccessor accessor = manager.getHelixDataAccessor();
159 	Builder keyBuilder = accessor.keyBuilder();
160 
161 	LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
162 	if (history == null) {
163 	    history = new LeaderHistory(PropertyType.HISTORY.toString());
164 	}
165 	history.updateHistory(manager.getClusterName(), manager.getInstanceName());
166 	accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
167     }
168 }