View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  
28  import org.apache.helix.HelixDataAccessor;
29  import org.apache.helix.HelixManager;
30  import org.apache.helix.HelixManagerProperties;
31  import org.apache.helix.PropertyKey;
32  import org.apache.helix.PropertyKey.Builder;
33  import org.apache.helix.controller.pipeline.AbstractBaseStage;
34  import org.apache.helix.controller.pipeline.StageException;
35  import org.apache.helix.model.LiveInstance;
36  import org.apache.helix.model.Message;
37  import org.apache.helix.model.Partition;
38  import org.apache.helix.model.Resource;
39  import org.apache.log4j.Logger;
40  
41  
42  public class TaskAssignmentStage extends AbstractBaseStage
43  {
44    private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
45  
46    @Override
47    public void process(ClusterEvent event) throws Exception
48    {
49      long startTime = System.currentTimeMillis();
50      logger.info("START TaskAssignmentStage.process()");
51  
52      HelixManager manager = event.getAttribute("helixmanager");
53      Map<String, Resource> resourceMap =
54          event.getAttribute(AttributeName.RESOURCES.toString());
55      MessageThrottleStageOutput messageOutput =
56          event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
57      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
58      Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
59  
60      if (manager == null || resourceMap == null || messageOutput == null
61          || cache == null || liveInstanceMap == null)
62      {
63        throw new StageException("Missing attributes in event:" + event
64            + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
65      }
66  
67      HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
68      List<Message> messagesToSend = new ArrayList<Message>();
69      for (String resourceName : resourceMap.keySet())
70      {
71        Resource resource = resourceMap.get(resourceName);
72        for (Partition partition : resource.getPartitions())
73        {
74          List<Message> messages = messageOutput.getMessages(resourceName, partition);
75          messagesToSend.addAll(messages);
76        }
77      }
78  
79      
80      List<Message> outputMessages = batchMessage(dataAccessor.keyBuilder(), 
81                    messagesToSend, resourceMap, liveInstanceMap, manager.getProperties());
82      sendMessages(dataAccessor, outputMessages);
83  
84      long endTime = System.currentTimeMillis();
85      logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
86          + " ms");
87  
88    }
89  
90    List<Message> batchMessage(Builder keyBuilder,
91                               List<Message> messages,
92                               Map<String, Resource> resourceMap,
93                               Map<String, LiveInstance> liveInstanceMap,
94                               HelixManagerProperties properties)
95    {
96      // group messages by its CurrentState path + "/" + fromState + "/" + toState
97      Map<String, Message> batchMessages = new HashMap<String, Message>();
98      List<Message> outputMessages = new ArrayList<Message>();
99  
100     Iterator<Message> iter = messages.iterator();
101     while (iter.hasNext())
102     {
103       Message message = iter.next();
104       String resourceName = message.getResourceName();
105       Resource resource = resourceMap.get(resourceName);
106       
107       String instanceName = message.getTgtName();
108       LiveInstance liveInstance = liveInstanceMap.get(instanceName);
109       String participantVersion = null;
110       if (liveInstance != null) {
111         participantVersion = liveInstance.getHelixVersion();
112       }
113 
114       if (resource == null || !resource.getBatchMessageMode() 
115           || participantVersion == null 
116           || !properties.isFeatureSupported("batch_message", participantVersion))
117       {
118         outputMessages.add(message);
119         continue;
120       }
121 
122       String key =
123           keyBuilder.currentState(message.getTgtName(),
124                                   message.getTgtSessionId(),
125                                   message.getResourceName()).getPath()
126               + "/" + message.getFromState() + "/" + message.getToState();
127 
128       if (!batchMessages.containsKey(key))
129       {
130         Message batchMessage = new Message(message.getRecord());
131         batchMessage.setBatchMessageMode(true);
132         outputMessages.add(batchMessage);
133         batchMessages.put(key, batchMessage);
134       }
135       batchMessages.get(key).addPartitionName(message.getPartitionName());
136     }
137 
138     return outputMessages;
139   }
140 
141   protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
142   {
143     if (messages == null || messages.isEmpty())
144     {
145       return;
146     }
147 
148     Builder keyBuilder = dataAccessor.keyBuilder();
149 
150     List<PropertyKey> keys = new ArrayList<PropertyKey>();
151     for (Message message : messages)
152     {
153       logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
154           + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
155           + " from:" + message.getFromState() + " to:" + message.getToState());
156 
157 //      System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " + message.getTgtName()
158 //              + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
159 //              + " from: " + message.getFromState() + " to: " + message.getToState());
160 
161       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
162     }
163 
164     dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
165   }
166 }