1 package org.apache.helix.messaging;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27 import java.util.concurrent.ConcurrentHashMap;
28
29 import org.apache.helix.ClusterMessagingService;
30 import org.apache.helix.ConfigAccessor;
31 import org.apache.helix.model.ConfigScope;
32 import org.apache.helix.model.builder.ConfigScopeBuilder;
33 import org.apache.helix.Criteria;
34 import org.apache.helix.HelixDataAccessor;
35 import org.apache.helix.HelixManager;
36 import org.apache.helix.InstanceType;
37 import org.apache.helix.PropertyKey.Builder;
38 import org.apache.helix.messaging.handling.AsyncCallbackService;
39 import org.apache.helix.messaging.handling.HelixTaskExecutor;
40 import org.apache.helix.messaging.handling.MessageHandlerFactory;
41 import org.apache.helix.model.LiveInstance;
42 import org.apache.helix.model.Message;
43 import org.apache.helix.model.Message.MessageType;
44 import org.apache.log4j.Logger;
45
46
47 public class DefaultMessagingService implements ClusterMessagingService
48 {
49 private final HelixManager _manager;
50 private final CriteriaEvaluator _evaluator;
51 private final HelixTaskExecutor _taskExecutor;
52
53 private final AsyncCallbackService _asyncCallbackService;
54 private static Logger _logger =
55 Logger.getLogger(DefaultMessagingService.class);
56 ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded
57 = new ConcurrentHashMap<String, MessageHandlerFactory>();
58
59 public DefaultMessagingService(HelixManager manager)
60 {
61 _manager = manager;
62 _evaluator = new CriteriaEvaluator();
63 _taskExecutor = new HelixTaskExecutor();
64 _asyncCallbackService = new AsyncCallbackService();
65 _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
66 _asyncCallbackService);
67 }
68
69 @Override
70 public int send(Criteria recipientCriteria, final Message messageTemplate)
71 {
72 return send(recipientCriteria, messageTemplate, null, -1);
73 }
74
75 @Override
76 public int send(final Criteria recipientCriteria,
77 final Message message,
78 AsyncCallback callbackOnReply,
79 int timeOut)
80 {
81 return send(recipientCriteria, message, callbackOnReply, timeOut, 0);
82 }
83
84 @Override
85 public int send(final Criteria recipientCriteria,
86 final Message message,
87 AsyncCallback callbackOnReply,
88 int timeOut,
89 int retryCount)
90 {
91 Map<InstanceType, List<Message>> generateMessage =
92 generateMessage(recipientCriteria, message);
93 int totalMessageCount = 0;
94 for (List<Message> messages : generateMessage.values())
95 {
96 totalMessageCount += messages.size();
97 }
98 _logger.info("Send " + totalMessageCount + " messages with criteria "
99 + recipientCriteria);
100 if (totalMessageCount == 0)
101 {
102 return 0;
103 }
104 String correlationId = null;
105 if (callbackOnReply != null)
106 {
107 int totalTimeout = timeOut * (retryCount + 1);
108 if (totalTimeout < 0)
109 {
110 totalTimeout = -1;
111 }
112 callbackOnReply.setTimeout(totalTimeout);
113 correlationId = UUID.randomUUID().toString();
114 for (List<Message> messages : generateMessage.values())
115 {
116 callbackOnReply.setMessagesSent(messages);
117 }
118 _asyncCallbackService.registerAsyncCallback(correlationId, callbackOnReply);
119 }
120
121 for (InstanceType receiverType : generateMessage.keySet())
122 {
123 List<Message> list = generateMessage.get(receiverType);
124 for (Message tempMessage : list)
125 {
126 tempMessage.setRetryCount(retryCount);
127 tempMessage.setExecutionTimeout(timeOut);
128 tempMessage.setSrcInstanceType(_manager.getInstanceType());
129 if (correlationId != null)
130 {
131 tempMessage.setCorrelationId(correlationId);
132 }
133
134 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
135 Builder keyBuilder = accessor.keyBuilder();
136
137 if (receiverType == InstanceType.CONTROLLER)
138 {
139
140
141
142 accessor.setProperty(keyBuilder.controllerMessage(tempMessage.getId()),
143 tempMessage);
144 }
145
146 if (receiverType == InstanceType.PARTICIPANT)
147 {
148 accessor.setProperty(keyBuilder.message(tempMessage.getTgtName(),
149 tempMessage.getId()),
150 tempMessage);
151 }
152 }
153 }
154
155 if (callbackOnReply != null)
156 {
157
158 callbackOnReply.startTimer();
159 }
160 return totalMessageCount;
161 }
162
163 public Map<InstanceType, List<Message>> generateMessage(final Criteria recipientCriteria,
164 final Message message)
165 {
166 Map<InstanceType, List<Message>> messagesToSendMap =
167 new HashMap<InstanceType, List<Message>>();
168 InstanceType instanceType = recipientCriteria.getRecipientInstanceType();
169
170 if (instanceType == InstanceType.CONTROLLER)
171 {
172 List<Message> messages = generateMessagesForController(message);
173 messagesToSendMap.put(InstanceType.CONTROLLER, messages);
174
175
176 }
177 else if (instanceType == InstanceType.PARTICIPANT)
178 {
179 List<Message> messages = new ArrayList<Message>();
180 List<Map<String, String>> matchedList =
181 _evaluator.evaluateCriteria(recipientCriteria, _manager);
182
183 if (!matchedList.isEmpty())
184 {
185 Map<String, String> sessionIdMap = new HashMap<String, String>();
186 if (recipientCriteria.isSessionSpecific())
187 {
188 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
189 Builder keyBuilder = accessor.keyBuilder();
190
191 List<LiveInstance> liveInstances =
192 accessor.getChildValues(keyBuilder.liveInstances());
193
194 for (LiveInstance liveInstance : liveInstances)
195 {
196 sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
197 }
198 }
199 for (Map<String, String> map : matchedList)
200 {
201 String id = UUID.randomUUID().toString();
202 Message newMessage = new Message(message.getRecord(), id);
203 String srcInstanceName = _manager.getInstanceName();
204 String tgtInstanceName = map.get("instanceName");
205
206 if (recipientCriteria.isSelfExcluded()
207 && srcInstanceName.equalsIgnoreCase(tgtInstanceName))
208 {
209 continue;
210 }
211 newMessage.setSrcName(srcInstanceName);
212 newMessage.setTgtName(tgtInstanceName);
213 newMessage.setResourceName(map.get("resourceName"));
214 newMessage.setPartitionName(map.get("partitionName"));
215 if (recipientCriteria.isSessionSpecific())
216 {
217 newMessage.setTgtSessionId(sessionIdMap.get(tgtInstanceName));
218 }
219 messages.add(newMessage);
220 }
221 messagesToSendMap.put(InstanceType.PARTICIPANT, messages);
222 }
223 }
224 return messagesToSendMap;
225 }
226
227 private List<Message> generateMessagesForController(Message message)
228 {
229 List<Message> messages = new ArrayList<Message>();
230 String id = UUID.randomUUID().toString();
231 Message newMessage = new Message(message.getRecord(), id);
232 newMessage.setMsgId(id);
233 newMessage.setSrcName(_manager.getInstanceName());
234 newMessage.setTgtName("Controller");
235 messages.add(newMessage);
236 return messages;
237 }
238
239 @Override
240 public synchronized void registerMessageHandlerFactory(String type, MessageHandlerFactory factory)
241 {
242 if (_manager.isConnected())
243 {
244 registerMessageHandlerFactoryInternal(type, factory);
245 }
246 else
247 {
248 _messageHandlerFactoriestobeAdded.put(type, factory);
249 }
250 }
251
252 public synchronized void onConnected()
253 {
254 for(String type : _messageHandlerFactoriestobeAdded.keySet())
255 {
256 registerMessageHandlerFactoryInternal(type, _messageHandlerFactoriestobeAdded.get(type));
257 }
258 _messageHandlerFactoriestobeAdded.clear();
259 }
260
261 void registerMessageHandlerFactoryInternal(String type, MessageHandlerFactory factory)
262 {
263 _logger.info("registering msg factory for type " + type);
264 int threadpoolSize = HelixTaskExecutor.DEFAULT_PARALLEL_TASKS;
265 String threadpoolSizeStr = null;
266 String key = type + "." + HelixTaskExecutor.MAX_THREADS;
267
268 ConfigAccessor configAccessor = _manager.getConfigAccessor();
269 if(configAccessor != null)
270 {
271 ConfigScope scope = null;
272
273
274
275
276 if(_manager.getInstanceType() == InstanceType.PARTICIPANT || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
277 {
278 scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).forParticipant(_manager.getInstanceName()).build();
279 threadpoolSizeStr = configAccessor.get(scope, key);
280 }
281
282 if(threadpoolSizeStr == null)
283 {
284 scope = new ConfigScopeBuilder().forCluster(_manager.getClusterName()).build();
285 threadpoolSizeStr = configAccessor.get(scope, key);
286 }
287 }
288
289 if(threadpoolSizeStr != null)
290 {
291 try
292 {
293 threadpoolSize = Integer.parseInt(threadpoolSizeStr);
294 if(threadpoolSize <= 0)
295 {
296 threadpoolSize = 1;
297 }
298 }
299 catch(Exception e)
300 {
301 _logger.error("", e);
302 }
303 }
304
305 _taskExecutor.registerMessageHandlerFactory(type, factory, threadpoolSize);
306
307
308
309
310
311 sendNopMessage();
312 }
313
314 public void sendNopMessage()
315 {
316 if (_manager.isConnected())
317 {
318 try
319 {
320 Message nopMsg = new Message(MessageType.NO_OP, UUID.randomUUID().toString());
321 nopMsg.setSrcName(_manager.getInstanceName());
322
323 HelixDataAccessor accessor = _manager.getHelixDataAccessor();
324 Builder keyBuilder = accessor.keyBuilder();
325
326 if (_manager.getInstanceType() == InstanceType.CONTROLLER
327 || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
328 {
329 nopMsg.setTgtName("Controller");
330 accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), nopMsg);
331 }
332
333 if (_manager.getInstanceType() == InstanceType.PARTICIPANT
334 || _manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT)
335 {
336 nopMsg.setTgtName(_manager.getInstanceName());
337 accessor.setProperty(keyBuilder.message(nopMsg.getTgtName(), nopMsg.getId()),
338 nopMsg);
339 }
340 }
341 catch (Exception e)
342 {
343 _logger.error(e);
344 }
345 }
346 }
347
348 public HelixTaskExecutor getExecutor()
349 {
350 return _taskExecutor;
351 }
352
353 @Override
354 public int sendAndWait(Criteria receipientCriteria,
355 Message message,
356 AsyncCallback asyncCallback,
357 int timeOut,
358 int retryCount)
359 {
360 int messagesSent =
361 send(receipientCriteria, message, asyncCallback, timeOut, retryCount);
362 if (messagesSent > 0)
363 {
364 while (!asyncCallback.isDone() && !asyncCallback.isTimedOut())
365 {
366 synchronized (asyncCallback)
367 {
368 try
369 {
370 asyncCallback.wait();
371 }
372 catch (InterruptedException e)
373 {
374 _logger.error(e);
375 asyncCallback.setInterrupted(true);
376 break;
377 }
378 }
379 }
380 }
381 else
382 {
383 _logger.warn("No messages sent. For Criteria:" + receipientCriteria);
384 }
385 return messagesSent;
386 }
387
388 @Override
389 public int sendAndWait(Criteria recipientCriteria,
390 Message message,
391 AsyncCallback asyncCallback,
392 int timeOut)
393 {
394 return sendAndWait(recipientCriteria, message, asyncCallback, timeOut, 0);
395 }
396 }