1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.io.FileWriter;
24 import java.io.IOException;
25 import java.util.Date;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.TreeSet;
29 import java.util.concurrent.atomic.AtomicBoolean;
30
31 import org.apache.commons.io.FileUtils;
32
33
34
35
36
37
38
39 public class ChangeLogProcessor implements Runnable
40 {
41 private final ChangeLogReader reader;
42 RsyncInvoker rsyncInvoker;
43 private AtomicBoolean shutdownRequested;
44 private CheckpointFile checkpointFile;
45 private Thread thread;
46
47 public ChangeLogProcessor(ChangeLogReader reader, String remoteHost,
48 String remoteBaseDir, String localBaseDir, String checkpointDirPath)
49 throws Exception
50 {
51 this.reader = reader;
52 checkpointFile = new CheckpointFile(checkpointDirPath);
53
54 shutdownRequested = new AtomicBoolean(false);
55 rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
56 }
57
58 public void start()
59 {
60 thread = new Thread(this);
61 thread.start();
62 }
63
64 public void run()
65 {
66 try
67 {
68 ChangeRecord lastRecordProcessed = checkpointFile.findLastRecordProcessed();
69 do
70 {
71 try
72 {
73 List<ChangeRecord> changes = reader
74 .getChangeSince(lastRecordProcessed);
75 Set<String> paths = getRemotePathsToSync(changes);
76 for (String path : paths)
77 {
78 rsyncInvoker.rsync(path);
79 }
80 lastRecordProcessed = changes.get(changes.size() - 1);
81 checkpointFile.checkpoint(lastRecordProcessed);
82 } catch (Exception e)
83 {
84 e.printStackTrace();
85 }
86 } while (!shutdownRequested.get());
87 } catch (Exception e)
88 {
89 e.printStackTrace();
90 }
91 }
92
93
94
95 private Set<String> getRemotePathsToSync(List<ChangeRecord> changes)
96 {
97 Set<String> paths = new TreeSet<String>();
98 for (ChangeRecord change : changes)
99 {
100 paths.add(change.file);
101 }
102 return paths;
103 }
104
105 public void stop()
106 {
107 shutdownRequested.set(true);
108 thread.interrupt();
109 }
110
111 }