View Javadoc

1   package org.apache.helix.controller.stages;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  import java.util.Map;
24  
25  import org.apache.helix.controller.pipeline.AbstractBaseStage;
26  import org.apache.helix.controller.pipeline.StageException;
27  import org.apache.helix.model.CurrentState;
28  import org.apache.helix.model.LiveInstance;
29  import org.apache.helix.model.Message;
30  import org.apache.helix.model.Partition;
31  import org.apache.helix.model.Resource;
32  import org.apache.helix.model.Message.MessageType;
33  
34  
35  /**
36   * For each LiveInstances select currentState and message whose sessionId matches
37   * sessionId from LiveInstance Get Partition,State for all the resources computed in
38   * previous State [ResourceComputationStage]
39   * 
40   * 
41   */
42  public class CurrentStateComputationStage extends AbstractBaseStage
43  {
44    @Override
45    public void process(ClusterEvent event) throws Exception
46    {
47      ClusterDataCache cache = event.getAttribute("ClusterDataCache");
48      Map<String, Resource> resourceMap =
49          event.getAttribute(AttributeName.RESOURCES.toString());
50  
51      if (cache == null || resourceMap == null)
52      {
53        throw new StageException("Missing attributes in event:" + event
54            + ". Requires DataCache|RESOURCE");
55      }
56  
57      Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
58      CurrentStateOutput currentStateOutput = new CurrentStateOutput();
59  
60      for (LiveInstance instance : liveInstances.values())
61      {
62        String instanceName = instance.getInstanceName();
63        Map<String, Message> instanceMessages = cache.getMessages(instanceName);
64        for (Message message : instanceMessages.values())
65        {
66          if (!MessageType.STATE_TRANSITION.toString()
67                                           .equalsIgnoreCase(message.getMsgType()))
68          {
69            continue;
70          }
71          if (!instance.getSessionId().equals(message.getTgtSessionId()))
72          {
73            continue;
74          }
75          String resourceName = message.getResourceName();
76          Resource resource = resourceMap.get(resourceName);
77          if (resource == null)
78          {
79            continue;
80          }
81  
82          if (!message.getBatchMessageMode())
83          {
84            String partitionName = message.getPartitionName();
85            Partition partition = resource.getPartition(partitionName);
86            if (partition != null)
87            {
88              currentStateOutput.setPendingState(resourceName,
89                                                 partition,
90                                                 instanceName,
91                                                 message.getToState());
92            }
93            else
94            {
95              // log
96            }
97          }
98          else
99          {
100           List<String> partitionNames = message.getPartitionNames();
101           if (!partitionNames.isEmpty())
102           {
103             for (String partitionName : partitionNames)
104             {
105               Partition partition = resource.getPartition(partitionName);
106               if (partition != null)
107               {
108                 currentStateOutput.setPendingState(resourceName,
109                                                    partition,
110                                                    instanceName,
111                                                    message.getToState());
112               }
113               else
114               {
115                 // log
116               }
117             }
118           }
119         }
120       }
121     }
122     for (LiveInstance instance : liveInstances.values())
123     {
124       String instanceName = instance.getInstanceName();
125 
126       String clientSessionId = instance.getSessionId();
127       Map<String, CurrentState> currentStateMap =
128           cache.getCurrentState(instanceName, clientSessionId);
129       for (CurrentState currentState : currentStateMap.values())
130       {
131 
132         if (!instance.getSessionId().equals(currentState.getSessionId()))
133         {
134           continue;
135         }
136         String resourceName = currentState.getResourceName();
137         String stateModelDefName = currentState.getStateModelDefRef();
138         Resource resource = resourceMap.get(resourceName);
139         if (resource == null)
140         {
141           continue;
142         }
143         if (stateModelDefName != null)
144         {
145           currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
146         }
147 
148         currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
149 
150         Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
151         for (String partitionName : partitionStateMap.keySet())
152         {
153           Partition partition = resource.getPartition(partitionName);
154           if (partition != null)
155           {
156             currentStateOutput.setCurrentState(resourceName,
157                                                partition,
158                                                instanceName,
159                                                currentState.getState(partitionName));
160 
161           }
162           else
163           {
164             // log
165           }
166         }
167       }
168     }
169     event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
170   }
171 }