View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Arrays;
23  
24  import org.apache.helix.HelixException;
25  import org.apache.helix.HelixManager;
26  import org.apache.helix.NotificationContext;
27  import org.apache.helix.messaging.handling.HelixTaskResult;
28  import org.apache.helix.messaging.handling.MessageHandler;
29  import org.apache.helix.messaging.handling.MessageHandlerFactory;
30  import org.apache.helix.model.Message;
31  import org.apache.log4j.Logger;
32  
33  
34  /**
35   * DefaultParticipantErrorMessageHandlerFactory works on controller side.
36   * When the participant detects a critical error, it will send the PARTICIPANT_ERROR_REPORT
37   * Message to the controller, specifying whether it want to disable the instance or
38   * disable the partition. The controller have a chance to do whatever make sense at that point,
39   * and then disable the corresponding partition or the instance. More configs per resource will
40   * be added to customize the controller behavior.
41   * */
42  public class DefaultParticipantErrorMessageHandlerFactory implements
43    MessageHandlerFactory
44  {
45    public enum ActionOnError
46    {
47      DISABLE_PARTITION, DISABLE_RESOURCE, DISABLE_INSTANCE
48    }
49  
50    public static final String ACTIONKEY = "ActionOnError";
51  
52    private static Logger _logger = Logger
53      .getLogger(DefaultParticipantErrorMessageHandlerFactory.class);
54    final HelixManager _manager;
55  
56    public DefaultParticipantErrorMessageHandlerFactory(HelixManager manager)
57    {
58      _manager = manager;
59    }
60  
61    public static class DefaultParticipantErrorMessageHandler extends MessageHandler
62    {
63      final HelixManager _manager;
64      public DefaultParticipantErrorMessageHandler(Message message,
65          NotificationContext context,  HelixManager manager)
66      {
67         super(message, context);
68         _manager = manager;
69      }
70  
71      @Override
72      public HelixTaskResult handleMessage() throws InterruptedException
73      {
74        HelixTaskResult result = new HelixTaskResult();
75        result.setSuccess(true);
76        // TODO : consider unify this with StatsAggregationStage.executeAlertActions()
77        try
78        {
79          ActionOnError actionOnError
80            = ActionOnError.valueOf(_message.getRecord().getSimpleField(ACTIONKEY));
81  
82          if(actionOnError == ActionOnError.DISABLE_INSTANCE)
83          {
84            _manager.getClusterManagmentTool().enableInstance(_manager.getClusterName(), _message.getMsgSrc(), false);
85            _logger.info("Instance " + _message.getMsgSrc() + " disabled");
86          }
87          else if(actionOnError == ActionOnError.DISABLE_PARTITION)
88          {
89            _manager.getClusterManagmentTool().enablePartition(false, _manager.getClusterName(), _message.getMsgSrc(),
90                _message.getResourceName(), Arrays.asList( _message.getPartitionName()));
91            _logger.info("partition " + _message.getPartitionName() + " disabled");
92          }
93          else if (actionOnError == ActionOnError.DISABLE_RESOURCE)
94          {
95            // NOT IMPLEMENTED, or we can disable all partitions
96            //_manager.getClusterManagmentTool().en(_manager.getClusterName(), _manager.getInstanceName(),
97            //    _message.getResourceName(), _message.getPartitionName(), false);
98            _logger.info("resource " + _message.getResourceName() + " disabled");
99          }
100       }
101       catch(Exception e)
102       {
103         _logger.error("", e);
104         result.setSuccess(false);
105         result.setException(e);
106       }
107       return result;
108     }
109 
110     @Override
111     public void onError(Exception e, ErrorCode code, ErrorType type)
112     {
113       _logger.error("Message handling pipeline get an exception. MsgId:"
114           + _message.getMsgId(), e);
115     }
116 
117   }
118 
119   @Override
120   public MessageHandler createHandler(Message message,
121       NotificationContext context)
122   {
123     String type = message.getMsgType();
124 
125     if(!type.equals(getMessageType()))
126     {
127       throw new HelixException("Unexpected msg type for message "+message.getMsgId()
128           +" type:" + message.getMsgType());
129     }
130 
131     return new DefaultParticipantErrorMessageHandler(message, context, _manager);
132   }
133 
134   @Override
135   public String getMessageType()
136   {
137     return Message.MessageType.PARTICIPANT_ERROR_REPORT.toString();
138   }
139 
140   @Override
141   public void reset()
142   {
143 
144   }
145 
146 }