1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.Iterator;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.apache.helix.HelixDataAccessor;
29 import org.apache.helix.HelixManager;
30 import org.apache.helix.HelixManagerProperties;
31 import org.apache.helix.PropertyKey;
32 import org.apache.helix.PropertyKey.Builder;
33 import org.apache.helix.controller.pipeline.AbstractBaseStage;
34 import org.apache.helix.controller.pipeline.StageException;
35 import org.apache.helix.model.LiveInstance;
36 import org.apache.helix.model.Message;
37 import org.apache.helix.model.Partition;
38 import org.apache.helix.model.Resource;
39 import org.apache.log4j.Logger;
40
41
42 public class TaskAssignmentStage extends AbstractBaseStage
43 {
44 private static Logger logger = Logger.getLogger(TaskAssignmentStage.class);
45
46 @Override
47 public void process(ClusterEvent event) throws Exception
48 {
49 long startTime = System.currentTimeMillis();
50 logger.info("START TaskAssignmentStage.process()");
51
52 HelixManager manager = event.getAttribute("helixmanager");
53 Map<String, Resource> resourceMap =
54 event.getAttribute(AttributeName.RESOURCES.toString());
55 MessageThrottleStageOutput messageOutput =
56 event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
57 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
58 Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
59
60 if (manager == null || resourceMap == null || messageOutput == null
61 || cache == null || liveInstanceMap == null)
62 {
63 throw new StageException("Missing attributes in event:" + event
64 + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
65 }
66
67 HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
68 List<Message> messagesToSend = new ArrayList<Message>();
69 for (String resourceName : resourceMap.keySet())
70 {
71 Resource resource = resourceMap.get(resourceName);
72 for (Partition partition : resource.getPartitions())
73 {
74 List<Message> messages = messageOutput.getMessages(resourceName, partition);
75 messagesToSend.addAll(messages);
76 }
77 }
78
79
80 List<Message> outputMessages = batchMessage(dataAccessor.keyBuilder(),
81 messagesToSend, resourceMap, liveInstanceMap, manager.getProperties());
82 sendMessages(dataAccessor, outputMessages);
83
84 long endTime = System.currentTimeMillis();
85 logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime)
86 + " ms");
87
88 }
89
90 List<Message> batchMessage(Builder keyBuilder,
91 List<Message> messages,
92 Map<String, Resource> resourceMap,
93 Map<String, LiveInstance> liveInstanceMap,
94 HelixManagerProperties properties)
95 {
96
97 Map<String, Message> batchMessages = new HashMap<String, Message>();
98 List<Message> outputMessages = new ArrayList<Message>();
99
100 Iterator<Message> iter = messages.iterator();
101 while (iter.hasNext())
102 {
103 Message message = iter.next();
104 String resourceName = message.getResourceName();
105 Resource resource = resourceMap.get(resourceName);
106
107 String instanceName = message.getTgtName();
108 LiveInstance liveInstance = liveInstanceMap.get(instanceName);
109 String participantVersion = null;
110 if (liveInstance != null) {
111 participantVersion = liveInstance.getHelixVersion();
112 }
113
114 if (resource == null || !resource.getBatchMessageMode()
115 || participantVersion == null
116 || !properties.isFeatureSupported("batch_message", participantVersion))
117 {
118 outputMessages.add(message);
119 continue;
120 }
121
122 String key =
123 keyBuilder.currentState(message.getTgtName(),
124 message.getTgtSessionId(),
125 message.getResourceName()).getPath()
126 + "/" + message.getFromState() + "/" + message.getToState();
127
128 if (!batchMessages.containsKey(key))
129 {
130 Message batchMessage = new Message(message.getRecord());
131 batchMessage.setBatchMessageMode(true);
132 outputMessages.add(batchMessage);
133 batchMessages.put(key, batchMessage);
134 }
135 batchMessages.get(key).addPartitionName(message.getPartitionName());
136 }
137
138 return outputMessages;
139 }
140
141 protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages)
142 {
143 if (messages == null || messages.isEmpty())
144 {
145 return;
146 }
147
148 Builder keyBuilder = dataAccessor.keyBuilder();
149
150 List<PropertyKey> keys = new ArrayList<PropertyKey>();
151 for (Message message : messages)
152 {
153 logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
154 + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
155 + " from:" + message.getFromState() + " to:" + message.getToState());
156
157
158
159
160
161 keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
162 }
163
164 dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
165 }
166 }