View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.FileWriter;
24  import java.io.IOException;
25  import java.util.Date;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.TreeSet;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  
31  import org.apache.commons.io.FileUtils;
32  
33  /**
34   * Processes the change log and invokes rsync for every change on the remote
35   * machine
36   * 
37   *
38   */
39  public class ChangeLogProcessor implements Runnable
40  {
41    private final ChangeLogReader reader;
42    RsyncInvoker rsyncInvoker;
43    private AtomicBoolean shutdownRequested;
44    private CheckpointFile checkpointFile;
45    private Thread thread;
46  
47    public ChangeLogProcessor(ChangeLogReader reader, String remoteHost,
48        String remoteBaseDir, String localBaseDir, String checkpointDirPath)
49        throws Exception
50    {
51      this.reader = reader;
52      checkpointFile = new CheckpointFile(checkpointDirPath);
53  
54      shutdownRequested = new AtomicBoolean(false);
55      rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
56    }
57  
58    public void start()
59    {
60      thread = new Thread(this);
61      thread.start();
62    }
63  
64    public void run()
65    {
66      try
67      {
68        ChangeRecord lastRecordProcessed = checkpointFile.findLastRecordProcessed();
69        do
70        {
71          try
72          {
73            List<ChangeRecord> changes = reader
74                .getChangeSince(lastRecordProcessed);
75            Set<String> paths = getRemotePathsToSync(changes);
76            for (String path : paths)
77            {
78              rsyncInvoker.rsync(path);
79            }
80            lastRecordProcessed = changes.get(changes.size() - 1);
81            checkpointFile.checkpoint(lastRecordProcessed);
82          } catch (Exception e)
83          {
84            e.printStackTrace();
85          }
86        } while (!shutdownRequested.get());
87      } catch (Exception e)
88      {
89        e.printStackTrace();
90      }
91    }
92  
93  
94  
95    private Set<String> getRemotePathsToSync(List<ChangeRecord> changes)
96    {
97      Set<String> paths = new TreeSet<String>();
98      for (ChangeRecord change : changes)
99      {
100       paths.add(change.file);
101     }
102     return paths;
103   }
104 
105   public void stop()
106   {
107     shutdownRequested.set(true);
108     thread.interrupt();
109   }
110 
111 }