View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.I0Itec.zkclient.DataUpdater;
23  import org.apache.log4j.Logger;
24  import org.apache.zookeeper.data.Stat;
25  
26  import org.apache.helix.AccessOption;
27  import org.apache.helix.HelixManager;
28  import org.apache.helix.NotificationContext;
29  import org.apache.helix.ZNRecord;
30  import org.apache.helix.model.InstanceConfig;
31  import org.apache.helix.model.Message;
32  import org.apache.helix.participant.statemachine.StateModel;
33  import org.apache.helix.participant.statemachine.StateModelInfo;
34  import org.apache.helix.participant.statemachine.Transition;
35  import org.apache.helix.store.zk.ZkHelixPropertyStore;
36  
37  @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "MASTER",
38      "SLAVE" })
39  public class FileStoreStateModel extends StateModel
40  {
41    private final class HighWaterMarkUpdater implements DataUpdater<ZNRecord>
42    {
43      private final Message message;
44      private final ChangeRecord lastRecordProcessed;
45  
46      private HighWaterMarkUpdater(Message message,
47          ChangeRecord lastRecordProcessed)
48      {
49        this.message = message;
50        this.lastRecordProcessed = lastRecordProcessed;
51      }
52  
53      @Override
54      public ZNRecord update(ZNRecord currentData)
55      {
56        ZNRecord newRec = new ZNRecord(message.getResourceName());
57  
58        if (currentData != null)
59        {
60          int currentGen = convertToInt(newRec.getSimpleField("currentGen"), 0);
61          int currentGenStartSeq = convertToInt(
62              newRec.getSimpleField("currentGenStartSeq"), 0);
63          int prevGen = convertToInt(newRec.getSimpleField("prevGen"), 0);
64          int prevGenEndSeq = convertToInt(
65              newRec.getSimpleField("prevGenEndSeq"), 0);
66          newRec.setSimpleField("currentGen", Integer.toString(currentGen + 1));
67          newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
68          if (currentGen > 0)
69          {
70            newRec.setSimpleField("prevGen", Integer.toString(currentGen));
71            int localEndSeq = 1;
72            if (lastRecordProcessed != null)
73            {
74              localEndSeq = (int) lastRecordProcessed.txid;
75            }
76            newRec.setSimpleField("prevGenEndSeq", "" + localEndSeq);
77          }
78          newRec.merge(currentData);
79        } else
80        {
81          newRec.setSimpleField("currentGen", Integer.toString(1));
82          newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
83        }
84        return newRec;
85  
86      }
87  
88      private int convertToInt(String number, int defaultValue)
89      {
90        try
91        {
92          if (number != null)
93          {
94            return Integer.parseInt(number);
95          }
96        } catch (Exception e)
97        {
98  
99        }
100       return defaultValue;
101     }
102   }
103 
104   private static Logger LOG = Logger.getLogger(FileStoreStateModel.class);
105 
106   private final String _serverId;
107   private final String _partition;
108 
109   private Replicator replicator;
110 
111   private ChangeLogGenerator generator;
112 
113   private FileSystemWatchService service;
114 
115   private InstanceConfig instanceConfig;
116 
117   public FileStoreStateModel(HelixManager manager, String resource,
118       String partition) 
119   {
120     String clusterName = manager.getClusterName();
121     String instanceName = manager.getInstanceName();
122     instanceConfig = manager.getClusterManagmentTool().getInstanceConfig(
123         clusterName, instanceName);
124     replicator = new Replicator(instanceConfig, resource, partition);
125     try
126     {
127       manager.addExternalViewChangeListener(replicator);
128     } catch (Exception e)
129     {
130       e.printStackTrace();
131     }
132     _partition = partition;
133     _serverId = instanceName;
134   }
135 
136   /**
137    * If the node is slave, start the rsync thread if it is not started
138    * 
139    * @param message
140    * @param context
141    * @throws Exception
142    */
143 
144   @Transition(from = "OFFLINE", to = "SLAVE")
145   public void onBecomeSlaveFromOffline(Message message,
146       NotificationContext context) throws Exception
147   {
148     System.out.println(_serverId + " transitioning from " + message.getFromState()
149         + " to " + message.getToState() + " for " + _partition);
150 
151     replicator.start();
152     System.out.println(_serverId + " transitioned from " + message.getFromState()
153         + " to " + message.getToState() + " for " + _partition);
154   }
155 
156   /**
157    * When the node becomes master, it will start accepting writes and increments
158    * the epoch and starts logging the changes in a file
159    * 
160    * @param message
161    * @param context
162    * @throws Exception
163    */
164   @Transition(from = "SLAVE", to = "MASTER")
165   public void onBecomeMasterFromSlave(final Message message,
166       NotificationContext context) throws Exception
167   {
168     replicator.stop();
169     System.out.println(_serverId + " transitioning from " + message.getFromState()
170         + " to " + message.getToState() + " for " + _partition);
171     ZkHelixPropertyStore<ZNRecord> helixPropertyStore = context.getManager()
172         .getHelixPropertyStore();
173     String checkpointDirPath = instanceConfig.getRecord().getSimpleField(
174         "check_point_dir");
175     CheckpointFile checkpointFile = new CheckpointFile(checkpointDirPath);
176     final ChangeRecord lastRecordProcessed = checkpointFile
177         .findLastRecordProcessed();
178     DataUpdater<ZNRecord> updater = new HighWaterMarkUpdater(message,
179         lastRecordProcessed);
180     helixPropertyStore.update(
181         "TRANSACTION_ID_METADATA" + "/" + message.getResourceName(), updater,
182         AccessOption.PERSISTENT);
183     Stat stat = new Stat();
184     ;
185     ZNRecord znRecord = helixPropertyStore.get("TRANSACTION_ID_METADATA" + "/"
186         + message.getResourceName(), stat, AccessOption.PERSISTENT);
187     int startGen = Integer.parseInt(znRecord.getSimpleField("currentGen"));
188     int startSeq = Integer.parseInt(znRecord
189         .getSimpleField("currentGenStartSeq"));
190     String fileStoreDir = instanceConfig.getRecord().getSimpleField("file_store_dir");
191     String changeLogDir = instanceConfig.getRecord().getSimpleField("change_log_dir");
192 
193     generator = new ChangeLogGenerator(changeLogDir, startGen, startSeq);
194     //To indicate that we need callbacks for changes that happen starting now
195     long now = System.currentTimeMillis();
196     service = new FileSystemWatchService(fileStoreDir, now, generator);
197     service.start();
198     System.out.println(_serverId + " transitioned from " + message.getFromState()
199         + " to " + message.getToState() + " for " + _partition);
200   }
201 
202   /**
203    * Stop writing
204    * 
205    * @param message
206    * @param context
207    * @throws Exception
208    */
209 
210   @Transition(from = "MASTER", to = "SLAVE")
211   public void onBecomeSlaveFromMaster(Message message,
212       NotificationContext context) throws Exception
213   {
214     service.stop();
215     LOG.info(_serverId + " transitioning from " + message.getFromState()
216         + " to " + message.getToState() + " for " + _partition);
217     replicator.start();
218   }
219 
220   @Transition(from = "SLAVE", to = "OFFLINE")
221   public void onBecomeOfflineFromSlave(Message message,
222       NotificationContext context)
223   {
224     replicator.stop();
225     LOG.info(_serverId + " transitioning from " + message.getFromState()
226         + " to " + message.getToState() + " for " + _partition);
227   }
228 
229   public void onBecomeDroppedFromOffline(Message message,
230       NotificationContext context)
231   {
232     LOG.info(_serverId + " Dropping partition " + _partition);
233   }
234 
235   @Override
236   public void reset()
237   {
238     LOG.warn("Default reset() invoked");
239   }
240 }