1 package org.apache.helix.controller.stages;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23 import java.util.Map;
24
25 import org.apache.helix.controller.pipeline.AbstractBaseStage;
26 import org.apache.helix.controller.pipeline.StageException;
27 import org.apache.helix.model.CurrentState;
28 import org.apache.helix.model.LiveInstance;
29 import org.apache.helix.model.Message;
30 import org.apache.helix.model.Partition;
31 import org.apache.helix.model.Resource;
32 import org.apache.helix.model.Message.MessageType;
33
34
35
36
37
38
39
40
41
42 public class CurrentStateComputationStage extends AbstractBaseStage
43 {
44 @Override
45 public void process(ClusterEvent event) throws Exception
46 {
47 ClusterDataCache cache = event.getAttribute("ClusterDataCache");
48 Map<String, Resource> resourceMap =
49 event.getAttribute(AttributeName.RESOURCES.toString());
50
51 if (cache == null || resourceMap == null)
52 {
53 throw new StageException("Missing attributes in event:" + event
54 + ". Requires DataCache|RESOURCE");
55 }
56
57 Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
58 CurrentStateOutput currentStateOutput = new CurrentStateOutput();
59
60 for (LiveInstance instance : liveInstances.values())
61 {
62 String instanceName = instance.getInstanceName();
63 Map<String, Message> instanceMessages = cache.getMessages(instanceName);
64 for (Message message : instanceMessages.values())
65 {
66 if (!MessageType.STATE_TRANSITION.toString()
67 .equalsIgnoreCase(message.getMsgType()))
68 {
69 continue;
70 }
71 if (!instance.getSessionId().equals(message.getTgtSessionId()))
72 {
73 continue;
74 }
75 String resourceName = message.getResourceName();
76 Resource resource = resourceMap.get(resourceName);
77 if (resource == null)
78 {
79 continue;
80 }
81
82 if (!message.getBatchMessageMode())
83 {
84 String partitionName = message.getPartitionName();
85 Partition partition = resource.getPartition(partitionName);
86 if (partition != null)
87 {
88 currentStateOutput.setPendingState(resourceName,
89 partition,
90 instanceName,
91 message.getToState());
92 }
93 else
94 {
95
96 }
97 }
98 else
99 {
100 List<String> partitionNames = message.getPartitionNames();
101 if (!partitionNames.isEmpty())
102 {
103 for (String partitionName : partitionNames)
104 {
105 Partition partition = resource.getPartition(partitionName);
106 if (partition != null)
107 {
108 currentStateOutput.setPendingState(resourceName,
109 partition,
110 instanceName,
111 message.getToState());
112 }
113 else
114 {
115
116 }
117 }
118 }
119 }
120 }
121 }
122 for (LiveInstance instance : liveInstances.values())
123 {
124 String instanceName = instance.getInstanceName();
125
126 String clientSessionId = instance.getSessionId();
127 Map<String, CurrentState> currentStateMap =
128 cache.getCurrentState(instanceName, clientSessionId);
129 for (CurrentState currentState : currentStateMap.values())
130 {
131
132 if (!instance.getSessionId().equals(currentState.getSessionId()))
133 {
134 continue;
135 }
136 String resourceName = currentState.getResourceName();
137 String stateModelDefName = currentState.getStateModelDefRef();
138 Resource resource = resourceMap.get(resourceName);
139 if (resource == null)
140 {
141 continue;
142 }
143 if (stateModelDefName != null)
144 {
145 currentStateOutput.setResourceStateModelDef(resourceName, stateModelDefName);
146 }
147
148 currentStateOutput.setBucketSize(resourceName, currentState.getBucketSize());
149
150 Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
151 for (String partitionName : partitionStateMap.keySet())
152 {
153 Partition partition = resource.getPartition(partitionName);
154 if (partition != null)
155 {
156 currentStateOutput.setCurrentState(resourceName,
157 partition,
158 instanceName,
159 currentState.getState(partitionName));
160
161 }
162 else
163 {
164
165 }
166 }
167 }
168 }
169 event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
170 }
171 }