1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Arrays;
23
24 import org.apache.helix.HelixException;
25 import org.apache.helix.HelixManager;
26 import org.apache.helix.NotificationContext;
27 import org.apache.helix.messaging.handling.HelixTaskResult;
28 import org.apache.helix.messaging.handling.MessageHandler;
29 import org.apache.helix.messaging.handling.MessageHandlerFactory;
30 import org.apache.helix.model.Message;
31 import org.apache.log4j.Logger;
32
33
34
35
36
37
38
39
40
41
42 public class DefaultParticipantErrorMessageHandlerFactory implements
43 MessageHandlerFactory
44 {
45 public enum ActionOnError
46 {
47 DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
48 }
49
50 public static final String ACTIONKEY = "ActionOnError";
51
52 private static Logger _logger = Logger
53 .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
54 final HelixManager _manager;
55
56 public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
57 {
58 _manager = manager;
59 }
60
61 public static class DefaultParticipantErrorMessageHandler extends MessageHandler
62 {
63 final HelixManager _manager;
64 public DefaultParticipantErrorMessageHandler(Message message,
65 NotificationContext context, HelixManager manager)
66 {
67 super(message, context);
68 _manager = manager;
69 }
70
71 @Override
72 public HelixTaskResult handleMessage() throws InterruptedException
73 {
74 HelixTaskResult result = new HelixTaskResult();
75 result.setSuccess(true);
76
77 try
78 {
79 ActionOnError actionOnError
80 = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
81
82 if(actionOnError == ActionOnError.DISABLE_INSTANCE)
83 {
84 _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
85 _logger.info("Instance " + _message.getMsgSrc() + " disabled");
86 }
87 else if(actionOnError == ActionOnError.DISABLE_PARTITION)
88 {
89 _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
90 _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
91 _logger.info("partition " + _message.getPartitionName() + " disabled");
92 }
93 else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
94 {
95
96
97
98 _logger.info("resource " + _message.getResourceName() + " disabled");
99 }
100 }
101 catch(Exception e)
102 {
103 _logger.error("", e);
104 result.setSuccess(false);
105 result.setException(e);
106 }
107 return result;
108 }
109
110 @Override
111 public void onError(Exception e, ErrorCode code, ErrorType type)
112 {
113 _logger.error("Message handling pipeline get an exception. MsgId:"
114 + _message.getMsgId(), e);
115 }
116
117 }
118
119 @Override
120 public MessageHandler createHandler(Message message,
121 NotificationContext context)
122 {
123 String type = message.getMsgType();
124
125 if(!type.equals(getMessageType()))
126 {
127 throw new HelixException("Unexpected msg type for message "+message.getMsgId()
128 +" type:" + message.getMsgType());
129 }
130
131 return new DefaultParticipantErrorMessageHandler(message, context, _manager);
132 }
133
134 @Override
135 public String getMessageType()
136 {
137 return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
138 }
139
140 @Override
141 public void reset()
142 {
143
144 }
145
146 }