View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.TreeMap;
28  
29  import org.apache.helix.HelixDataAccessor;
30  import org.apache.helix.HelixDefinedState;
31  import org.apache.helix.HelixManager;
32  import org.apache.helix.NotificationContext;
33  import org.apache.helix.PropertyKey;
34  import org.apache.helix.PropertyKey.Builder;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.ZNRecordDelta;
37  import org.apache.helix.ZNRecordDelta.MergeOperation;
38  import org.apache.helix.controller.pipeline.AbstractBaseStage;
39  import org.apache.helix.controller.pipeline.StageException;
40  import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
41  import org.apache.helix.model.ExternalView;
42  import org.apache.helix.model.IdealState;
43  import org.apache.helix.model.Message;
44  import org.apache.helix.model.Message.MessageType;
45  import org.apache.helix.model.Partition;
46  import org.apache.helix.model.Resource;
47  import org.apache.helix.model.StatusUpdate;
48  import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
49  import org.apache.log4j.Logger;
50  
51  public class ExternalViewComputeStage extends AbstractBaseStage
52  {
53    private static Logger log = Logger.getLogger(ExternalViewComputeStage.class);
54  
55    @Override
56    public void process(ClusterEvent event) throws Exception
57    {
58      long startTime = System.currentTimeMillis();
59      log.info("START ExternalViewComputeStage.process()");
60  
61      HelixManager manager = event.getAttribute("helixmanager");
62      Map<String, Resource> resourceMap =
63          event.getAttribute(AttributeName.RESOURCES.toString());
64      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
65  
66      if (manager == null || resourceMap == null || cache == null)
67      {
68        throw new StageException("Missing attributes in event:" + event
69            + ". Requires ClusterManager|RESOURCES|DataCache");
70      }
71  
72      HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
73      PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
74  
75      CurrentStateOutput currentStateOutput =
76          event.getAttribute(AttributeName.CURRENT_STATE.toString());
77  
78      List<ExternalView> newExtViews = new ArrayList<ExternalView>();
79      List<PropertyKey> keys = new ArrayList<PropertyKey>();
80  
81      Map<String, ExternalView> curExtViews =
82            dataAccessor.getChildValuesMap(keyBuilder.externalViews());
83  
84      for (String resourceName : resourceMap.keySet())
85      {
86        ExternalView view = new ExternalView(resourceName);
87        // view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
88        // if resource ideal state has bucket size, set it
89        // otherwise resource has been dropped, use bucket size from current state instead
90        Resource resource = resourceMap.get(resourceName);
91        if (resource.getBucketSize() > 0)
92        {
93          view.setBucketSize(resource.getBucketSize());
94        }
95        else
96        {
97          view.setBucketSize(currentStateOutput.getBucketSize(resourceName));
98        }
99  
100       for (Partition partition : resource.getPartitions())
101       {
102         Map<String, String> currentStateMap =
103             currentStateOutput.getCurrentStateMap(resourceName, partition);
104         if (currentStateMap != null && currentStateMap.size() > 0)
105         {
106           // Set<String> disabledInstances
107           // = cache.getDisabledInstancesForResource(resource.toString());
108           for (String instance : currentStateMap.keySet())
109           {
110             // if (!disabledInstances.contains(instance))
111             // {
112             view.setState(partition.getPartitionName(),
113                           instance,
114                           currentStateMap.get(instance));
115             // }
116           }
117         }
118       }
119       // Update cluster status monitor mbean
120       ClusterStatusMonitor clusterStatusMonitor =
121           (ClusterStatusMonitor) event.getAttribute("clusterStatusMonitor");
122       IdealState idealState = cache._idealStateMap.get(view.getResourceName());
123       if(idealState != null)
124       {
125         if (clusterStatusMonitor != null && !idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
126         {
127           clusterStatusMonitor.onExternalViewChange(view,
128                                                   cache._idealStateMap.get(view.getResourceName()));
129         }
130       }
131 
132       // compare the new external view with current one, set only on different
133       ExternalView curExtView = curExtViews.get(resourceName);
134       if (curExtView == null || !curExtView.getRecord().equals(view.getRecord()))
135       {
136         keys.add(keyBuilder.externalView(resourceName));
137         newExtViews.add(view);
138 
139         // For SCHEDULER_TASK_RESOURCE resource group (helix task queue), we need to find out which task 
140         // partitions are finished (COMPLETED or ERROR), update the status update of the original scheduler 
141         // message, and then remove the partitions from the ideal state
142         if(idealState != null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
143         {
144           updateScheduledTaskStatus(view, manager, idealState);
145         }
146       }
147     }
148     // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all. 
149     // Are there any entity that will be interested in its change?
150 
151     // add/update external-views
152     if (newExtViews.size() > 0)
153     {
154       dataAccessor.setChildren(keys, newExtViews);
155     }
156 
157     // remove dead external-views
158     for (String resourceName : curExtViews.keySet()) {
159         if (!resourceMap.keySet().contains(resourceName)) {
160             dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
161         }
162     }
163 
164     long endTime = System.currentTimeMillis();
165     log.info("END ExternalViewComputeStage.process(). took: " + (endTime - startTime)
166         + " ms");
167   }
168   
169   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager, IdealState taskQueueIdealState)
170   {
171     HelixDataAccessor accessor = manager.getHelixDataAccessor();
172     ZNRecord finishedTasks = new ZNRecord(ev.getResourceName());
173     
174     // Place holder for finished partitions
175     Map<String, String> emptyMap = new HashMap<String, String>();
176     List<String> emptyList = new LinkedList<String>();
177     
178     Map<String, Integer> controllerMsgIdCountMap = new HashMap<String, Integer>();
179     Map<String, Map<String, String>> controllerMsgUpdates = new HashMap<String, Map<String, String>>();
180     
181     Builder keyBuilder = accessor.keyBuilder();
182           
183     for(String taskPartitionName : ev.getPartitionSet())
184     {
185       for(String taskState : ev.getStateMap(taskPartitionName).values())
186       {
187         if(taskState.equalsIgnoreCase(HelixDefinedState.ERROR.toString()) || taskState.equalsIgnoreCase("COMPLETED"))
188         {
189           log.info(taskPartitionName + " finished as " + taskState);
190           finishedTasks.getListFields().put(taskPartitionName, emptyList);
191           finishedTasks.getMapFields().put(taskPartitionName, emptyMap);
192           
193           // Update original scheduler message status update
194           if(taskQueueIdealState.getRecord().getMapField(taskPartitionName) != null)
195           {
196             String controllerMsgId 
197               = taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
198             if(controllerMsgId != null)
199             {
200               log.info(taskPartitionName + " finished with controllerMsg " + controllerMsgId);
201               if(!controllerMsgUpdates.containsKey(controllerMsgId))
202               {
203                 controllerMsgUpdates.put(controllerMsgId, new HashMap<String, String>());
204               }
205               controllerMsgUpdates.get(controllerMsgId).put(taskPartitionName, taskState);
206             }
207           }
208         }
209       }
210     }
211     // fill the controllerMsgIdCountMap
212     for(String taskId : taskQueueIdealState.getPartitionSet())
213     {
214       String controllerMsgId 
215         = taskQueueIdealState.getRecord().getMapField(taskId).get(DefaultSchedulerMessageHandlerFactory.CONTROLLER_MSG_ID);
216       if(controllerMsgId != null)
217       {
218         if(!controllerMsgIdCountMap.containsKey(controllerMsgId))
219         {
220           controllerMsgIdCountMap.put(controllerMsgId, 0);
221         }
222         controllerMsgIdCountMap.put(controllerMsgId, (controllerMsgIdCountMap.get(controllerMsgId) + 1));
223       }
224     }
225     
226     if(controllerMsgUpdates.size() > 0)
227     {
228       for(String controllerMsgId : controllerMsgUpdates.keySet())
229       {
230         PropertyKey controllerStatusUpdateKey 
231           = keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), controllerMsgId);
232         StatusUpdate controllerStatusUpdate = accessor.getProperty(controllerStatusUpdateKey);
233         for(String taskPartitionName : controllerMsgUpdates.get(controllerMsgId).keySet())
234         {
235           Map<String, String> result = new HashMap<String, String>();
236           result.put("Result", controllerMsgUpdates.get(controllerMsgId).get(taskPartitionName));
237           controllerStatusUpdate.getRecord().setMapField("MessageResult "  + 
238              taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.TGT_NAME.toString()) + " " + taskPartitionName + " " + 
239              taskQueueIdealState.getRecord().getMapField(taskPartitionName).get(Message.Attributes.MSG_ID.toString())
240              , result);
241         }
242         // All done for the scheduled tasks that came from controllerMsgId, add summary for it
243         if(controllerMsgUpdates.get(controllerMsgId).size() == controllerMsgIdCountMap.get(controllerMsgId).intValue())
244         {
245           int finishedTasksNum = 0;
246           int completedTasksNum = 0;
247           for(String key : controllerStatusUpdate.getRecord().getMapFields().keySet())
248           {
249             if(key.startsWith("MessageResult "))
250             {
251               finishedTasksNum ++;
252             }
253             if(controllerStatusUpdate.getRecord().getMapField(key).get("Result") != null)
254             {
255               if(controllerStatusUpdate.getRecord().getMapField(key).get("Result").equalsIgnoreCase("COMPLETED"))
256               {
257                 completedTasksNum++;
258               }
259             }
260           }
261           Map<String, String> summary = new TreeMap<String, String>();
262           summary.put("TotalMessages:", "" + finishedTasksNum);
263           summary.put("CompletedMessages", "" + completedTasksNum);
264           
265           controllerStatusUpdate.getRecord().setMapField("Summary", summary);
266         }
267         // Update the statusUpdate of controllerMsgId
268         accessor.updateProperty(controllerStatusUpdateKey, controllerStatusUpdate);
269       }
270     }
271     
272     if(finishedTasks.getListFields().size() > 0)
273     {
274       ZNRecordDelta znDelta = new ZNRecordDelta(finishedTasks, MergeOperation.SUBTRACT);
275       List<ZNRecordDelta> deltaList = new LinkedList<ZNRecordDelta>();
276       deltaList.add(znDelta);
277       IdealState delta = new IdealState(taskQueueIdealState.getResourceName());
278       delta.setDeltaList(deltaList);
279 
280       // Remove the finished (COMPLETED or ERROR) tasks from the SCHEDULER_TASK_RESOURCE idealstate
281       keyBuilder = accessor.keyBuilder();
282       accessor.updateProperty(keyBuilder.idealStates(taskQueueIdealState.getResourceName()), delta);
283     }
284   }
285 
286 }