View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.UUID;
27  
28  import org.apache.helix.HelixManager;
29  import org.apache.helix.controller.pipeline.AbstractBaseStage;
30  import org.apache.helix.controller.pipeline.StageException;
31  import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
32  import org.apache.helix.model.IdealState;
33  import org.apache.helix.model.LiveInstance;
34  import org.apache.helix.model.Message;
35  import org.apache.helix.model.Partition;
36  import org.apache.helix.model.Resource;
37  import org.apache.helix.model.StateModelDefinition;
38  import org.apache.helix.model.Message.MessageState;
39  import org.apache.helix.model.Message.MessageType;
40  import org.apache.log4j.Logger;
41  
42  /**
43   * Compares the currentState, pendingState with IdealState and generate messages
44   * 
45   */
46  public class MessageGenerationPhase extends AbstractBaseStage
47  {
48    private static Logger logger = Logger.getLogger(MessageGenerationPhase.class);
49  
50    @Override
51    public void process(ClusterEvent event) throws Exception
52    {
53      HelixManager manager = event.getAttribute("helixmanager");
54      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
55      Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
56      CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE
57          .toString());
58      BestPossibleStateOutput bestPossibleStateOutput = event
59          .getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
60      if (manager == null || cache == null || resourceMap == null || currentStateOutput == null
61          || bestPossibleStateOutput == null)
62      {
63        throw new StageException("Missing attributes in event:" + event
64            + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
65      }
66  
67      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
68      Map<String, String> sessionIdMap = new HashMap<String, String>();
69  
70      for (LiveInstance liveInstance : liveInstances.values())
71      {
72        sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
73      }
74      MessageGenerationOutput output = new MessageGenerationOutput();
75  
76      for (String resourceName : resourceMap.keySet())
77      {
78        Resource resource = resourceMap.get(resourceName);
79        int bucketSize = resource.getBucketSize();
80  
81        StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
82  
83        for (Partition partition : resource.getPartitions())
84        {
85          Map<String, String> instanceStateMap = bestPossibleStateOutput.getInstanceStateMap(
86              resourceName, partition);
87  
88          // we should generate message based on the desired-state priority
89          // so keep generated messages in a temp map keyed by state
90          // desired-state->list of generated-messages
91          Map<String, List<Message>> messageMap = new HashMap<String, List<Message>>();
92          
93          for (String instanceName : instanceStateMap.keySet())
94          {
95            String desiredState = instanceStateMap.get(instanceName);
96  
97            String currentState = currentStateOutput.getCurrentState(resourceName, partition,
98                instanceName);
99            if (currentState == null)
100           {
101             currentState = stateModelDef.getInitialState();
102           }
103 
104           if (desiredState.equalsIgnoreCase(currentState))
105           {
106             continue;
107           }
108 
109           String pendingState = currentStateOutput.getPendingState(resourceName, partition,
110               instanceName);
111 
112           String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
113           if (nextState == null)
114           {
115             logger.error("Unable to find a next state for partition: "
116                 + partition.getPartitionName() + " from stateModelDefinition"
117                 + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
118             continue;
119           }
120 
121           if (pendingState != null)
122           {
123             if (nextState.equalsIgnoreCase(pendingState))
124             {
125               logger.debug("Message already exists for " + instanceName + " to transit "
126                   + partition.getPartitionName() + " from " + currentState + " to " + nextState);
127             } else if (currentState.equalsIgnoreCase(pendingState))
128             {
129               logger.info("Message hasn't been removed for " + instanceName + " to transit"
130                   + partition.getPartitionName() + " to " + pendingState + ", desiredState: "
131                   + desiredState);
132             } else
133             {
134               logger.info("IdealState changed before state transition completes for "
135                   + partition.getPartitionName() + " on " + instanceName + ", pendingState: "
136                   + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
137             }
138           } else
139           {
140             Message message = createMessage(manager, resourceName, partition.getPartitionName(),
141                 instanceName, currentState, nextState, sessionIdMap.get(instanceName),
142                 stateModelDef.getId(), resource.getStateModelFactoryname(), bucketSize);
143             IdealState idealState = cache.getIdealState(resourceName);
144             if(idealState!= null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
145             {
146               if(idealState.getRecord().getMapField(partition.getPartitionName())!=null)
147               {
148                 message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
149               }
150             }
151             // Set timeout of needed
152             String stateTransition = currentState + "-" + nextState + "_"
153                 + Message.Attributes.TIMEOUT;
154             if (idealState != null)
155             {
156               String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
157               if(timeOutStr == null && idealState.getStateModelDefRef().equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE))
158               {
159                 // scheduled task queue
160                 if(idealState.getRecord().getMapField(partition.getPartitionName()) != null)
161                 {
162                   timeOutStr = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
163                 }
164               }
165               if(timeOutStr !=null)
166               {
167                 try
168                 {
169                   int timeout = Integer.parseInt(timeOutStr);
170                   if (timeout > 0)
171                   {
172                     message.setExecutionTimeout(timeout);
173                   }
174                 } catch (Exception e)
175                 {
176                   logger.error("", e);
177                 }
178               }
179             }
180             message.getRecord().setSimpleField("ClusterEventName", event.getName());
181             // output.addMessage(resourceName, partition, message);
182             if (!messageMap.containsKey(desiredState)) {
183             	messageMap.put(desiredState, new ArrayList<Message>());
184             }
185             messageMap.get(desiredState).add(message);
186           }
187         }
188         
189         // add generated messages to output according to state priority
190         List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
191         for (String state : statesPriorityList) {
192         	if (messageMap.containsKey(state)) {
193         		for (Message message : messageMap.get(state)) {
194         			output.addMessage(resourceName, partition, message);
195         		}
196         	}
197         }
198         
199       }	// end of for-each-partition
200     }
201     event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
202   }
203 
204   private Message createMessage(HelixManager manager, String resourceName, String partitionName,
205       String instanceName, String currentState, String nextState, String sessionId,
206       String stateModelDefName, String stateModelFactoryName, int bucketSize)
207   {
208     String uuid = UUID.randomUUID().toString();
209     Message message = new Message(MessageType.STATE_TRANSITION, uuid);
210     message.setSrcName(manager.getInstanceName());
211     message.setTgtName(instanceName);
212     message.setMsgState(MessageState.NEW);
213     message.setPartitionName(partitionName);
214     message.setResourceName(resourceName);
215     message.setFromState(currentState);
216     message.setToState(nextState);
217     message.setTgtSessionId(sessionId);
218     message.setSrcSessionId(manager.getSessionId());
219     message.setStateModelDef(stateModelDefName);
220     message.setStateModelFactoryName(stateModelFactoryName);
221     message.setBucketSize(bucketSize);
222 
223     return message;
224   }
225 }