View Javadoc

1   package org.apache.helix.messaging;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.UUID;
27  import java.util.concurrent.ConcurrentHashMap;
28  
29  import org.apache.helix.ClusterMessagingService;
30  import org.apache.helix.ConfigAccessor;
31  import org.apache.helix.model.ConfigScope;
32  import org.apache.helix.model.builder.ConfigScopeBuilder;
33  import org.apache.helix.Criteria;
34  import org.apache.helix.HelixDataAccessor;
35  import org.apache.helix.HelixManager;
36  import org.apache.helix.InstanceType;
37  import org.apache.helix.PropertyKey.Builder;
38  import org.apache.helix.messaging.handling.AsyncCallbackService;
39  import org.apache.helix.messaging.handling.HelixTaskExecutor;
40  import org.apache.helix.messaging.handling.MessageHandlerFactory;
41  import org.apache.helix.model.LiveInstance;
42  import org.apache.helix.model.Message;
43  import org.apache.helix.model.Message.MessageType;
44  import org.apache.log4j.Logger;
45  
46  
47  public class DefaultMessagingService implements ClusterMessagingService
48  {
49    private final HelixManager         _manager;
50    private final CriteriaEvaluator    _evaluator;
51    private final HelixTaskExecutor    _taskExecutor;
52    // TODO:rename to factory, this is not a service
53    private final AsyncCallbackService _asyncCallbackService;
54    private static Logger              _logger =
55                                                   Logger.getLogger(DefaultMessagingService.class);
56    ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
57      = new ConcurrentHashMap<String, MessageHandlerFactory>();
58    
59    public DefaultMessagingService(HelixManager manager)
60    {
61      _manager = manager;
62      _evaluator = new CriteriaEvaluator();
63      _taskExecutor = new HelixTaskExecutor();
64      _asyncCallbackService = new AsyncCallbackService();
65      _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
66                                                  _asyncCallbackService);
67    }
68  
69    @Override
70    public int send(Criteria recipientCriteria, final Message messageTemplate)
71    {
72      return send(recipientCriteria, messageTemplate, null, -1);
73    }
74  
75    @Override
76    public int send(final Criteria recipientCriteria,
77                    final Message message,
78                    AsyncCallback callbackOnReply,
79                    int timeOut)
80    {
81      return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
82    }
83  
84    @Override
85    public int send(final Criteria recipientCriteria,
86                    final Message message,
87                    AsyncCallback callbackOnReply,
88                    int timeOut,
89                    int retryCount)
90    {
91      Map<InstanceType, List<Message>> generateMessage =
92          generateMessage(recipientCriteria, message);
93      int totalMessageCount = 0;
94      for (List<Message> messages : generateMessage.values())
95      {
96        totalMessageCount += messages.size();
97      }
98      _logger.info("Send " + totalMessageCount + " messages with criteria "
99          + recipientCriteria);
100     if (totalMessageCount == 0)
101     {
102       return 0;
103     }
104     String correlationId = null;
105     if (callbackOnReply != null)
106     {
107       int totalTimeout = timeOut * (retryCount + 1);
108       if (totalTimeout < 0)
109       {
110         totalTimeout = -1;
111       }
112       callbackOnReply.setTimeout(totalTimeout);
113       correlationId = UUID.randomUUID().toString();
114       for (List<Message> messages : generateMessage.values())
115       {
116         callbackOnReply.setMessagesSent(messages);
117       }
118       _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
119     }
120 
121     for (InstanceType receiverType : generateMessage.keySet())
122     {
123       List<Message> list = generateMessage.get(receiverType);
124       for (Message tempMessage : list)
125       {
126         tempMessage.setRetryCount(retryCount);
127         tempMessage.setExecutionTimeout(timeOut);
128         tempMessage.setSrcInstanceType(_manager.getInstanceType());
129         if (correlationId != null)
130         {
131           tempMessage.setCorrelationId(correlationId);
132         }
133 
134         HelixDataAccessor accessor = _manager.getHelixDataAccessor();
135         Builder keyBuilder = accessor.keyBuilder();
136 
137         if (receiverType == InstanceType.CONTROLLER)
138         {
139           // _manager.getDataAccessor().setProperty(PropertyType.MESSAGES_CONTROLLER,
140           // tempMessage,
141           // tempMessage.getId());
142           accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
143                                tempMessage);
144         }
145 
146         if (receiverType == InstanceType.PARTICIPANT)
147         {
148           accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
149                                                   tempMessage.getId()),
150                                tempMessage);
151         }
152       }
153     }
154 
155     if (callbackOnReply != null)
156     {
157       // start timer if timeout is set
158       callbackOnReply.startTimer();
159     }
160     return totalMessageCount;
161   }
162 
163   public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
164                                                            final Message message)
165   {
166     Map<InstanceType, List<Message>> messagesToSendMap =
167         new HashMap<InstanceType, List<Message>>();
168     InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
169 
170     if (instanceType == InstanceType.CONTROLLER)
171     {
172       List<Message> messages = generateMessagesForController(message);
173       messagesToSendMap.put(InstanceType.CONTROLLER, messages);
174       // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
175       // newMessage.getRecord(), CreateMode.PERSISTENT);
176     }
177     else if (instanceType == InstanceType.PARTICIPANT)
178     {
179       List<Message> messages = new ArrayList<Message>();
180       List<Map<String, String>> matchedList =
181           _evaluator.evaluateCriteria(recipientCriteria, _manager);
182 
183       if (!matchedList.isEmpty())
184       {
185         Map<String, String> sessionIdMap = new HashMap<String, String>();
186         if (recipientCriteria.isSessionSpecific())
187         {
188           HelixDataAccessor accessor = _manager.getHelixDataAccessor();
189           Builder keyBuilder = accessor.keyBuilder();
190 
191           List<LiveInstance> liveInstances =
192               accessor.getChildValues(keyBuilder.liveInstances());
193 
194           for (LiveInstance liveInstance : liveInstances)
195           {
196             sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
197           }
198         }
199         for (Map<String, String> map : matchedList)
200         {
201           String id = UUID.randomUUID().toString();
202           Message newMessage = new Message(message.getRecord(), id);
203           String srcInstanceName = _manager.getInstanceName();
204           String tgtInstanceName = map.get("instanceName");
205           // Don't send message to self
206           if (recipientCriteria.isSelfExcluded()
207               && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
208           {
209             continue;
210           }
211           newMessage.setSrcName(srcInstanceName);
212           newMessage.setTgtName(tgtInstanceName);
213           newMessage.setResourceName(map.get("resourceName"));
214           newMessage.setPartitionName(map.get("partitionName"));
215           if (recipientCriteria.isSessionSpecific())
216           {
217             newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
218           }
219           messages.add(newMessage);
220         }
221         messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
222       }
223     }
224     return messagesToSendMap;
225   }
226 
227   private List<Message> generateMessagesForController(Message message)
228   {
229     List<Message> messages = new ArrayList<Message>();
230     String id = UUID.randomUUID().toString();
231     Message newMessage = new Message(message.getRecord(), id);
232     newMessage.setMsgId(id);
233     newMessage.setSrcName(_manager.getInstanceName());
234     newMessage.setTgtName("Controller");
235     messages.add(newMessage);
236     return messages;
237   }
238   
239   @Override
240   public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
241   {
242     if (_manager.isConnected())
243     {
244       registerMessageHandlerFactoryInternal(type, factory);
245     }
246     else
247     {
248       _messageHandlerFactoriestobeAdded.put(type, factory);
249     }
250   }
251   
252   public synchronized void onConnected()
253   {
254     for(String type : _messageHandlerFactoriestobeAdded.keySet())
255     {
256       registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
257     }
258     _messageHandlerFactoriestobeAdded.clear();
259   }
260   
261   void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
262   {
263     _logger.info("registering msg factory for type " + type);
264     int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
265     String threadpoolSizeStr = null;
266     String key = type + "." + HelixTaskExecutor.MAX_THREADS;
267     
268     ConfigAccessor configAccessor = _manager.getConfigAccessor();
269     if(configAccessor != null)
270     {
271       ConfigScope scope = null;
272       
273       // Read the participant config and cluster config for the per-message type thread pool size.
274       // participant config will override the cluster config.
275       
276       if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
277       { 
278         scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
279         threadpoolSizeStr = configAccessor.get(scope, key);
280       }
281       
282       if(threadpoolSizeStr == null)
283       {
284         scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
285         threadpoolSizeStr = configAccessor.get(scope, key);
286       }
287     }
288     
289     if(threadpoolSizeStr != null)
290     {
291       try
292       {
293         threadpoolSize = Integer.parseInt(threadpoolSizeStr);
294         if(threadpoolSize <= 0)
295         {
296           threadpoolSize = 1;
297         }
298       }
299       catch(Exception e)
300       {
301         _logger.error("", e);
302       }
303     }
304     
305     _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
306     // Self-send a no-op message, so that the onMessage() call will be invoked
307     // again, and
308     // we have a chance to process the message that we received with the new
309     // added MessageHandlerFactory
310     // before the factory is added.
311     sendNopMessage();
312   }
313 
314   public void sendNopMessage()
315   {
316     if (_manager.isConnected())
317     {
318       try
319       {
320         Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
321         nopMsg.setSrcName(_manager.getInstanceName());
322 
323         HelixDataAccessor accessor = _manager.getHelixDataAccessor();
324         Builder keyBuilder = accessor.keyBuilder();
325 
326         if (_manager.getInstanceType() == InstanceType.CONTROLLER
327             || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
328         {
329           nopMsg.setTgtName("Controller");
330           accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
331         }
332 
333         if (_manager.getInstanceType() == InstanceType.PARTICIPANT
334             || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
335         {
336           nopMsg.setTgtName(_manager.getInstanceName());
337           accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
338                                nopMsg);
339         }
340       }
341       catch (Exception e)
342       {
343         _logger.error(e);
344       }
345     }
346   }
347 
348   public HelixTaskExecutor getExecutor()
349   {
350     return _taskExecutor;
351   }
352 
353   @Override
354   public int sendAndWait(Criteria receipientCriteria,
355                          Message message,
356                          AsyncCallback asyncCallback,
357                          int timeOut,
358                          int retryCount)
359   {
360     int messagesSent =
361         send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
362     if (messagesSent > 0)
363     {
364       while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
365       {
366         synchronized (asyncCallback)
367         {
368           try
369           {
370             asyncCallback.wait();
371           }
372           catch (InterruptedException e)
373           {
374             _logger.error(e);
375             asyncCallback.setInterrupted(true);
376             break;
377           }
378         }
379       }
380     }
381     else
382     {
383       _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
384     }
385     return messagesSent;
386   }
387 
388   @Override
389   public int sendAndWait(Criteria recipientCriteria,
390                          Message message,
391                          AsyncCallback asyncCallback,
392                          int timeOut)
393   {
394     return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
395   }
396 }