View Javadoc

1   package org.apache.helix.messaging.handling;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  import java.util.HashMap;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ConcurrentLinkedQueue;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.apache.helix.PropertyKey;
29  import org.apache.helix.model.CurrentState;
30  import org.apache.helix.model.Message;
31  import org.apache.helix.model.Message.Attributes;
32  
33  
34  public class GroupMessageHandler
35  {
36    class CurrentStateUpdate
37    {
38      final PropertyKey  _key;
39      final CurrentState _curStateDelta;
40  
41      public CurrentStateUpdate(PropertyKey key, CurrentState curStateDelta)
42      {
43        _key = key;
44        _curStateDelta = curStateDelta;
45      }
46  
47      public void merge(CurrentState curState)
48      {
49        _curStateDelta.getRecord().merge(curState.getRecord());
50      }
51    }
52  
53    static class GroupMessageInfo
54    {
55      final Message                                   _message;
56      final AtomicInteger                             _countDown;
57      final ConcurrentLinkedQueue<CurrentStateUpdate> _curStateUpdateList;
58  
59      public GroupMessageInfo(Message message)
60      {
61        _message = message;
62        List<String> partitionNames = message.getPartitionNames();
63        _countDown = new AtomicInteger(partitionNames.size());
64        _curStateUpdateList = new ConcurrentLinkedQueue<CurrentStateUpdate>();
65      }
66      
67      public Map<PropertyKey, CurrentState> merge()
68      {
69        Map<String, CurrentStateUpdate> curStateUpdateMap =
70            new HashMap<String, CurrentStateUpdate>();
71        for (CurrentStateUpdate update : _curStateUpdateList)
72        {
73          String path = update._key.getPath();
74          if (!curStateUpdateMap.containsKey(path))
75          {
76            curStateUpdateMap.put(path, update);
77          }
78          else
79          {
80            curStateUpdateMap.get(path).merge(update._curStateDelta);
81          }
82        }
83  
84        Map<PropertyKey, CurrentState> ret = new HashMap<PropertyKey, CurrentState>();
85        for (CurrentStateUpdate update : curStateUpdateMap.values())
86        {
87          ret.put(update._key, update._curStateDelta);
88        }
89  
90        return ret;
91      }
92   
93    }
94  
95    final ConcurrentHashMap<String, GroupMessageInfo> _groupMsgMap;
96  
97    public GroupMessageHandler()
98    {
99      _groupMsgMap = new ConcurrentHashMap<String, GroupMessageInfo>();
100   }
101 
102   public void put(Message message)
103   {
104     _groupMsgMap.putIfAbsent(message.getId(), new GroupMessageInfo(message));
105   }
106 
107   // return non-null if all sub-messages are completed
108   public GroupMessageInfo onCompleteSubMessage(Message subMessage)
109   {
110     String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
111     GroupMessageInfo info = _groupMsgMap.get(parentMid);
112     if (info != null)
113     {
114       int val = info._countDown.decrementAndGet();
115       if (val <= 0)
116       {
117         return _groupMsgMap.remove(parentMid);
118       }
119     }
120 
121     return null;
122   }
123 
124   void addCurStateUpdate(Message subMessage, PropertyKey key, CurrentState delta)
125   {
126     String parentMid = subMessage.getAttribute(Attributes.PARENT_MSG_ID);
127     GroupMessageInfo info = _groupMsgMap.get(parentMid);
128     if (info != null)
129     {
130       info._curStateUpdateList.add(new CurrentStateUpdate(key, delta));
131     }
132 
133   }
134 }