1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28
29 import org.apache.helix.HelixDataAccessor;
30 import org.apache.helix.HelixDefinedState;
31 import org.apache.helix.HelixManager;
32 import org.apache.helix.NotificationContext;
33 import org.apache.helix.PropertyKey;
34 import org.apache.helix.PropertyKey.Builder;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.ZNRecordDelta;
37 import org.apache.helix.ZNRecordDelta.MergeOperation;
38 import org.apache.helix.controller.pipeline.AbstractBaseStage;
39 import org.apache.helix.controller.pipeline.StageException;
40 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
41 import org.apache.helix.model.ExternalView;
42 import org.apache.helix.model.IdealState;
43 import org.apache.helix.model.Message;
44 import org.apache.helix.model.Message.MessageType;
45 import org.apache.helix.model.Partition;
46 import org.apache.helix.model.Resource;
47 import org.apache.helix.model.StatusUpdate;
48 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
49 import org.apache.log4j.Logger;
50
51 public class ExternalViewComputeStage extends AbstractBaseStage
52 {
53 private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
54
55 @Override
56 public void process(ClusterEvent event) throws Exception
57 {
58 long startTime = System.currentTimeMillis();
59 log.info("START ExternalViewComputeStage.process()");
60
61 HelixManager manager = event.getAttribute("helixmanager");
62 Map<String, Resource> resourceMap =
63 event.getAttribute(AttributeName.RESOURCES.toString());
64 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
65
66 if (manager == null || resourceMap == null || cache == null)
67 {
68 throw new StageException("Missing attributes in event:" + event
69 + ". Requires ClusterManager|RESOURCES|DataCache");
70 }
71
72 HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
73 PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
74
75 CurrentStateOutput currentStateOutput =
76 event.getAttribute(AttributeName.CURRENT_STATE.toString());
77
78 List<ExternalView> newExtViews = new ArrayList<ExternalView>();
79 List<PropertyKey> keys = new ArrayList<PropertyKey>();
80
81 Map<String, ExternalView> curExtViews =
82 dataAccessor.getChildValuesMap(keyBuilder.externalViews());
83
84 for (String resourceName : resourceMap.keySet())
85 {
86 ExternalView view = new ExternalView(resourceName);
87
88
89
90 Resource resource = resourceMap.get(resourceName);
91 if (resource.getBucketSize() > 0)
92 {
93 view.setBucketSize(resource.getBucketSize());
94 }
95 else
96 {
97 view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
98 }
99
100 for (Partition partition : resource.getPartitions())
101 {
102 Map<String, String> currentStateMap =
103 currentStateOutput.getCurrentStateMap(resourceName, partition);
104 if (currentStateMap != null && currentStateMap.size() > 0)
105 {
106
107
108 for (String instance : currentStateMap.keySet())
109 {
110
111
112 view.setState(partition.getPartitionName(),
113 instance,
114 currentStateMap.get(instance));
115
116 }
117 }
118 }
119
120 ClusterStatusMonitor clusterStatusMonitor =
121 (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
122 IdealState idealState = cache._idealStateMap.get(view.getResourceName());
123 if(idealState != null)
124 {
125 if (clusterStatusMonitor != null && !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
126 {
127 clusterStatusMonitor.onExternalViewChange(view,
128 cache._idealStateMap.get(view.getResourceName()));
129 }
130 }
131
132
133 ExternalView curExtView = curExtViews.get(resourceName);
134 if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
135 {
136 keys.add(keyBuilder.externalView(resourceName));
137 newExtViews.add(view);
138
139
140
141
142 if(idealState != null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
143 {
144 updateScheduledTaskStatus(view, manager, idealState);
145 }
146 }
147 }
148
149
150
151
152 if (newExtViews.size() > 0)
153 {
154 dataAccessor.setChildren(keys, newExtViews);
155 }
156
157
158 for (String resourceName : curExtViews.keySet()) {
159 if (!resourceMap.keySet().contains(resourceName)) {
160 dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
161 }
162 }
163
164 long endTime = System.currentTimeMillis();
165 log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime)
166 + " ms");
167 }
168
169 private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, IdealState taskQueueIdealState)
170 {
171 HelixDataAccessor accessor = manager.getHelixDataAccessor();
172 ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
173
174
175 Map<String, String> emptyMap = new HashMap<String, String>();
176 List<String> emptyList = new LinkedList<String>();
177
178 Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
179 Map<String, Map<String, String>> controllerMsgUpdates = new HashMap<String, Map<String, String>>();
180
181 Builder keyBuilder = accessor.keyBuilder();
182
183 for(String taskPartitionName : ev.getPartitionSet())
184 {
185 for(String taskState : ev.getStateMap(taskPartitionName).values())
186 {
187 if(taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) || taskState.equalsIgnoreCase("COMPLETED"))
188 {
189 log.info(taskPartitionName + " finished as " + taskState);
190 finishedTasks.getListFields().put(taskPartitionName, emptyList);
191 finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
192
193
194 if(taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null)
195 {
196 String controllerMsgId
197 = taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
198 if(controllerMsgId != null)
199 {
200 log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
201 if(!controllerMsgUpdates.containsKey(controllerMsgId))
202 {
203 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
204 }
205 controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
206 }
207 }
208 }
209 }
210 }
211
212 for(String taskId : taskQueueIdealState.getPartitionSet())
213 {
214 String controllerMsgId
215 = taskQueueIdealState.getRecord().getMapField(taskId).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
216 if(controllerMsgId != null)
217 {
218 if(!controllerMsgIdCountMap.containsKey(controllerMsgId))
219 {
220 controllerMsgIdCountMap.put(controllerMsgId, 0);
221 }
222 controllerMsgIdCountMap.put(controllerMsgId, (controllerMsgIdCountMap.get(controllerMsgId) + 1));
223 }
224 }
225
226 if(controllerMsgUpdates.size() > 0)
227 {
228 for(String controllerMsgId : controllerMsgUpdates.keySet())
229 {
230 PropertyKey controllerStatusUpdateKey
231 = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
232 StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
233 for(String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet())
234 {
235 Map<String, String> result = new HashMap<String, String>();
236 result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
237 controllerStatusUpdate.getRecord().setMapField("MessageResult " +
238 taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.TGT_NAME.toString()) + " " + taskPartitionName + " " +
239 taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.MSG_ID.toString())
240 , result);
241 }
242
243 if(controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(controllerMsgId).intValue())
244 {
245 int finishedTasksNum = 0;
246 int completedTasksNum = 0;
247 for(String key : controllerStatusUpdate.getRecord().getMapFields().keySet())
248 {
249 if(key.startsWith("MessageResult "))
250 {
251 finishedTasksNum ++;
252 }
253 if(controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null)
254 {
255 if(controllerStatusUpdate.getRecord().getMapField(key).get("Result").equalsIgnoreCase("COMPLETED"))
256 {
257 completedTasksNum++;
258 }
259 }
260 }
261 Map<String, String> summary = new TreeMap<String, String>();
262 summary.put("TotalMessages:", "" + finishedTasksNum);
263 summary.put("CompletedMessages", "" + completedTasksNum);
264
265 controllerStatusUpdate.getRecord().setMapField("Summary", summary);
266 }
267
268 accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
269 }
270 }
271
272 if(finishedTasks.getListFields().size() > 0)
273 {
274 ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
275 List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
276 deltaList.add(znDelta);
277 IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
278 delta.setDeltaList(deltaList);
279
280
281 keyBuilder = accessor.keyBuilder();
282 accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
283 }
284 }
285
286 }