1 package org.apache.helix.messaging.handling;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentLinkedQueue;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.apache.helix.PropertyKey;
29 import org.apache.helix.model.CurrentState;
30 import org.apache.helix.model.Message;
31 import org.apache.helix.model.Message.Attributes;
32
33
34 public class GroupMessageHandler
35 {
36 class CurrentStateUpdate
37 {
38 final PropertyKey _key;
39 final CurrentState _curStateDelta;
40
41 public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
42 {
43 _key = key;
44 _curStateDelta = curStateDelta;
45 }
46
47 public void merge(CurrentState curState)
48 {
49 _curStateDelta.getRecord().merge(curState.getRecord());
50 }
51 }
52
53 static class GroupMessageInfo
54 {
55 final Message _message;
56 final AtomicInteger _countDown;
57 final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
58
59 public GroupMessageInfo(Message message)
60 {
61 _message = message;
62 List<String> partitionNames = message.getPartitionNames();
63 _countDown = new AtomicInteger(partitionNames.size());
64 _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
65 }
66
67 public Map<PropertyKey, CurrentState> merge()
68 {
69 Map<String, CurrentStateUpdate> curStateUpdateMap =
70 new HashMap<String, CurrentStateUpdate>();
71 for (CurrentStateUpdate update : _curStateUpdateList)
72 {
73 String path = update._key.getPath();
74 if (!curStateUpdateMap.containsKey(path))
75 {
76 curStateUpdateMap.put(path, update);
77 }
78 else
79 {
80 curStateUpdateMap.get(path).merge(update._curStateDelta);
81 }
82 }
83
84 Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
85 for (CurrentStateUpdate update : curStateUpdateMap.values())
86 {
87 ret.put(update._key, update._curStateDelta);
88 }
89
90 return ret;
91 }
92
93 }
94
95 final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
96
97 public GroupMessageHandler()
98 {
99 _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
100 }
101
102 public void put(Message message)
103 {
104 _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
105 }
106
107
108 public GroupMessageInfo onCompleteSubMessage(Message subMessage)
109 {
110 String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
111 GroupMessageInfo info = _groupMsgMap.get(parentMid);
112 if (info != null)
113 {
114 int val = info._countDown.decrementAndGet();
115 if (val <= 0)
116 {
117 return _groupMsgMap.remove(parentMid);
118 }
119 }
120
121 return null;
122 }
123
124 void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
125 {
126 String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
127 GroupMessageInfo info = _groupMsgMap.get(parentMid);
128 if (info != null)
129 {
130 info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
131 }
132
133 }
134 }