View Javadoc

1   package org.apache.helix.recipes.rabbitmq;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.log4j.Logger;
23  
24  import org.apache.helix.NotificationContext;
25  import org.apache.helix.model.Message;
26  import org.apache.helix.participant.statemachine.StateModel;
27  import org.apache.helix.participant.statemachine.StateModelInfo;
28  import org.apache.helix.participant.statemachine.Transition;
29  
30  @StateModelInfo(initialState = "OFFLINE", states = { "ONLINE", "ERROR" })
31  public class ConsumerStateModel extends StateModel
32  {
33    private static Logger LOG = Logger.getLogger(ConsumerStateModel.class);
34  
35    private final String _consumerId;
36    private final String _partition;
37  
38    private final String _mqServer;
39    private ConsumerThread _thread = null;
40  
41    public ConsumerStateModel(String consumerId, String partition, String mqServer)
42    {
43      _partition = partition;
44      _consumerId = consumerId;
45      _mqServer = mqServer;
46    }
47  
48    @Transition(to = "ONLINE", from = "OFFLINE")
49    public void onBecomeOnlineFromOffline(Message message, NotificationContext context)
50    {
51      LOG.debug(_consumerId + " becomes ONLINE from OFFLINE for " + _partition);
52  
53      if (_thread == null)
54      {
55        LOG.debug("Starting ConsumerThread for " + _partition + "...");
56        _thread = new ConsumerThread(_partition, _mqServer, _consumerId);
57        _thread.start();
58        LOG.debug("Starting ConsumerThread for " + _partition + " done");
59  
60      }
61    }
62  
63    @Transition(to = "OFFLINE", from = "ONLINE")
64    public void onBecomeOfflineFromOnline(Message message, NotificationContext context)
65        throws InterruptedException
66    {
67      LOG.debug(_consumerId + " becomes OFFLINE from ONLINE for " + _partition);
68  
69      if (_thread != null)
70      {
71        LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
72  
73        _thread.interrupt();
74        _thread.join(2000);
75        _thread = null;
76        LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
77  
78      }
79    }
80  
81    @Transition(to = "DROPPED", from = "OFFLINE")
82    public void onBecomeDroppedFromOffline(Message message, NotificationContext context)
83    {
84      LOG.debug(_consumerId + " becomes DROPPED from OFFLINE for " + _partition);
85    }
86  
87    @Transition(to = "OFFLINE", from = "ERROR")
88    public void onBecomeOfflineFromError(Message message, NotificationContext context)
89    {
90      LOG.debug(_consumerId + " becomes OFFLINE from ERROR for " + _partition);
91    }
92  
93    @Override
94    public void reset()
95    {
96      LOG.warn("Default reset() invoked");
97      
98      if (_thread != null)
99      {
100       LOG.debug("Stopping " + _consumerId + " for " + _partition + "...");
101 
102       _thread.interrupt();
103       try
104       {
105         _thread.join(2000);
106       } catch (InterruptedException e)
107       {
108         // TODO Auto-generated catch block
109         e.printStackTrace();
110       }
111       _thread = null;
112       LOG.debug("Stopping " +  _consumerId + " for " + _partition + " done");
113 
114     }
115   }
116 }