1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27
28 import org.apache.helix.HelixManager;
29 import org.apache.helix.controller.pipeline.AbstractBaseStage;
30 import org.apache.helix.controller.pipeline.StageException;
31 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
32 import org.apache.helix.model.IdealState;
33 import org.apache.helix.model.LiveInstance;
34 import org.apache.helix.model.Message;
35 import org.apache.helix.model.Partition;
36 import org.apache.helix.model.Resource;
37 import org.apache.helix.model.StateModelDefinition;
38 import org.apache.helix.model.Message.MessageState;
39 import org.apache.helix.model.Message.MessageType;
40 import org.apache.log4j.Logger;
41
42
43
44
45
46 public class MessageGenerationPhase extends AbstractBaseStage
47 {
48 private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
49
50 @Override
51 public void process(ClusterEvent event) throws Exception
52 {
53 HelixManager manager = event.getAttribute("helixmanager");
54 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
55 Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
56 CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
57 .toString());
58 BestPossibleStateOutput bestPossibleStateOutput = event
59 .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
60 if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
61 || bestPossibleStateOutput == null)
62 {
63 throw new StageException("Missing attributes in event:" + event
64 + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
65 }
66
67 Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
68 Map<String, String> sessionIdMap = new HashMap<String, String>();
69
70 for (LiveInstance liveInstance : liveInstances.values())
71 {
72 sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
73 }
74 MessageGenerationOutput output = new MessageGenerationOutput();
75
76 for (String resourceName : resourceMap.keySet())
77 {
78 Resource resource = resourceMap.get(resourceName);
79 int bucketSize = resource.getBucketSize();
80
81 StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
82
83 for (Partition partition : resource.getPartitions())
84 {
85 Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
86 resourceName, partition);
87
88
89
90
91 Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
92
93 for (String instanceName : instanceStateMap.keySet())
94 {
95 String desiredState = instanceStateMap.get(instanceName);
96
97 String currentState = currentStateOutput.getCurrentState(resourceName, partition,
98 instanceName);
99 if (currentState == null)
100 {
101 currentState = stateModelDef.getInitialState();
102 }
103
104 if (desiredState.equalsIgnoreCase(currentState))
105 {
106 continue;
107 }
108
109 String pendingState = currentStateOutput.getPendingState(resourceName, partition,
110 instanceName);
111
112 String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
113 if (nextState == null)
114 {
115 logger.error("Unable to find a next state for partition: "
116 + partition.getPartitionName() + " from stateModelDefinition"
117 + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
118 continue;
119 }
120
121 if (pendingState != null)
122 {
123 if (nextState.equalsIgnoreCase(pendingState))
124 {
125 logger.debug("Message already exists for " + instanceName + " to transit "
126 + partition.getPartitionName() + " from " + currentState + " to " + nextState);
127 } else if (currentState.equalsIgnoreCase(pendingState))
128 {
129 logger.info("Message hasn't been removed for " + instanceName + " to transit"
130 + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
131 + desiredState);
132 } else
133 {
134 logger.info("IdealState changed before state transition completes for "
135 + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
136 + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
137 }
138 } else
139 {
140 Message message = createMessage(manager, resourceName, partition.getPartitionName(),
141 instanceName, currentState, nextState, sessionIdMap.get(instanceName),
142 stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
143 IdealState idealState = cache.getIdealState(resourceName);
144 if(idealState!= null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
145 {
146 if(idealState.getRecord().getMapField(partition.getPartitionName())!=null)
147 {
148 message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
149 }
150 }
151
152 String stateTransition = currentState + "-" + nextState + "_"
153 + Message.Attributes.TIMEOUT;
154 if (idealState != null)
155 {
156 String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
157 if(timeOutStr == null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
158 {
159
160 if(idealState.getRecord().getMapField(partition.getPartitionName()) != null)
161 {
162 timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
163 }
164 }
165 if(timeOutStr !=null)
166 {
167 try
168 {
169 int timeout = Integer.parseInt(timeOutStr);
170 if (timeout > 0)
171 {
172 message.setExecutionTimeout(timeout);
173 }
174 } catch (Exception e)
175 {
176 logger.error("", e);
177 }
178 }
179 }
180 message.getRecord().setSimpleField("ClusterEventName", event.getName());
181
182 if (!messageMap.containsKey(desiredState)) {
183 messageMap.put(desiredState, new ArrayList<Message>());
184 }
185 messageMap.get(desiredState).add(message);
186 }
187 }
188
189
190 List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
191 for (String state : statesPriorityList) {
192 if (messageMap.containsKey(state)) {
193 for (Message message : messageMap.get(state)) {
194 output.addMessage(resourceName, partition, message);
195 }
196 }
197 }
198
199 }
200 }
201 event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
202 }
203
204 private Message createMessage(HelixManager manager, String resourceName, String partitionName,
205 String instanceName, String currentState, String nextState, String sessionId,
206 String stateModelDefName, String stateModelFactoryName, int bucketSize)
207 {
208 String uuid = UUID.randomUUID().toString();
209 Message message = new Message(MessageType.STATE_TRANSITION, uuid);
210 message.setSrcName(manager.getInstanceName());
211 message.setTgtName(instanceName);
212 message.setMsgState(MessageState.NEW);
213 message.setPartitionName(partitionName);
214 message.setResourceName(resourceName);
215 message.setFromState(currentState);
216 message.setToState(nextState);
217 message.setTgtSessionId(sessionId);
218 message.setSrcSessionId(manager.getSessionId());
219 message.setStateModelDef(stateModelDefName);
220 message.setStateModelFactoryName(stateModelFactoryName);
221 message.setBucketSize(bucketSize);
222
223 return message;
224 }
225 }