1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 import org.apache.helix.HelixException;
24 import org.apache.helix.NotificationContext;
25 import org.apache.helix.messaging.handling.HelixTaskResult;
26 import org.apache.helix.messaging.handling.MessageHandler;
27 import org.apache.helix.messaging.handling.MessageHandlerFactory;
28 import org.apache.helix.model.Message;
29 import org.apache.helix.model.Message.MessageType;
30 import org.apache.log4j.Logger;
31
32
33 public class DefaultControllerMessageHandlerFactory implements
34 MessageHandlerFactory
35 {
36 private static Logger _logger = Logger.getLogger(DefaultControllerMessageHandlerFactory.class);
37 @Override
38 public MessageHandler createHandler(Message message,
39 NotificationContext context)
40 {
41 String type = message.getMsgType();
42
43 if(!type.equals(getMessageType()))
44 {
45 throw new HelixException("Unexpected msg type for message "+message.getMsgId()
46 +" type:" + message.getMsgType());
47 }
48
49 return new DefaultControllerMessageHandler(message, context);
50 }
51
52 @Override
53 public String getMessageType()
54 {
55 return MessageType.CONTROLLER_MSG.toString();
56 }
57
58 @Override
59 public void reset()
60 {
61
62 }
63
64 public static class DefaultControllerMessageHandler extends MessageHandler
65 {
66 public DefaultControllerMessageHandler(Message message,
67 NotificationContext context)
68 {
69 super(message, context);
70 }
71
72 @Override
73 public HelixTaskResult handleMessage() throws InterruptedException
74 {
75 String type = _message.getMsgType();
76 HelixTaskResult result = new HelixTaskResult();
77 if(!type.equals(MessageType.CONTROLLER_MSG.toString()))
78 {
79 throw new HelixException("Unexpected msg type for message "+_message.getMsgId()
80 +" type:" + _message.getMsgType());
81 }
82 result.getTaskResultMap().put("ControllerResult", "msg "+ _message.getMsgId() + " from "+_message.getMsgSrc() + " processed");
83 result.setSuccess(true);
84 return result;
85 }
86
87 @Override
88 public void onError(Exception e, ErrorCode code, ErrorType type)
89 {
90 _logger.error("Message handling pipeline get an exception. MsgId:" + _message.getMsgId(), e);
91 }
92 }
93 }