1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.util.Collection;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.apache.commons.jci.listeners.AbstractFilesystemAlterationListener;
27 import org.apache.commons.jci.monitor.FilesystemAlterationMonitor;
28
29 public class FileSystemWatchService
30 {
31 enum ChangeType
32 {
33 CREATE, DELETE, MODIFY
34 };
35
36 private FilesystemAlterationMonitor fam;
37 private MyFilesystemAlterationListener listener;
38 private Thread thread;
39
40 public FileSystemWatchService(String root, FileChangeWatcher... watchers)
41 {
42 this(root, -1, watchers);
43 }
44
45 public FileSystemWatchService(String root, long startTime,
46 FileChangeWatcher... watchers)
47 {
48 File file = new File(root);
49 System.out.println("Setting up watch service for path:"
50 + file.getAbsolutePath());
51 fam = new FilesystemAlterationMonitor();
52 listener = new MyFilesystemAlterationListener(root, startTime, watchers);
53 fam.addListener(file, listener);
54 }
55
56 public void start()
57 {
58 fam.start();
59 thread = new Thread(listener);
60 thread.start();
61 }
62
63 static class MyFilesystemAlterationListener extends
64 AbstractFilesystemAlterationListener implements Runnable
65 {
66 private final FileChangeWatcher[] watchers;
67 private int length;
68 private final long startTime;
69 private AtomicBoolean stopRequest;
70
71 public MyFilesystemAlterationListener(String root, long startTime,
72 FileChangeWatcher[] watchers)
73 {
74 this.startTime = startTime;
75 File file = new File(root);
76 length = root.length() + 1;
77 this.watchers = watchers;
78 stopRequest = new AtomicBoolean(false);
79
80 }
81
82 @SuppressWarnings("unchecked")
83 public void run()
84 {
85 while (!stopRequest.get())
86 {
87 try
88 {
89 waitForCheck();
90 process(getCreatedDirectories(), watchers, ChangeType.CREATE);
91 process(getDeletedDirectories(), watchers, ChangeType.DELETE);
92 process(getChangedDirectories(), watchers, ChangeType.MODIFY);
93 process(getCreatedFiles(), watchers, ChangeType.CREATE);
94 process(getDeletedFiles(), watchers, ChangeType.DELETE);
95 process(getChangedFiles(), watchers, ChangeType.MODIFY);
96
97 } catch (Exception e)
98 {
99 e.printStackTrace();
100 }
101 }
102 }
103
104 private void process(Collection<File> files, FileChangeWatcher[] watchers,
105 ChangeType type)
106 {
107 if (files.size() > 0)
108 {
109 for (File file : files)
110 {
111 if (file.lastModified() < startTime)
112 {
113 continue;
114 }
115 String path = file.getPath();
116 String relativePath = ".";
117 if (path.length() > length)
118 {
119 relativePath = path.substring(length);
120 }
121 for (FileChangeWatcher watcher : watchers)
122 {
123 switch (type)
124 {
125 case CREATE:
126 watcher.onEntryAdded(relativePath);
127 break;
128 case DELETE:
129 watcher.onEntryDeleted(relativePath);
130 break;
131 case MODIFY:
132 watcher.onEntryModified(relativePath);
133 break;
134 }
135 }
136 }
137 }
138 }
139
140 public void stop()
141 {
142 stopRequest.set(true);
143 }
144 }
145
146 public void stop()
147 {
148 listener.stop();
149 fam.stop();
150 thread.interrupt();
151 }
152
153 public static void main(String[] args) throws Exception
154 {
155 FileChangeWatcher defaultWatcher = new FileChangeWatcher()
156 {
157
158 @Override
159 public void onEntryModified(String path)
160 {
161 System.out
162 .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryModified():"
163 + path);
164 }
165
166 @Override
167 public void onEntryDeleted(String path)
168 {
169 System.out
170 .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryDeleted():"
171 + path);
172 }
173
174 @Override
175 public void onEntryAdded(String path)
176 {
177 System.out
178 .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryAdded() : "
179 + path);
180 }
181 };
182 ChangeLogGenerator ChangeLogGenerator = new ChangeLogGenerator(
183 "data/localhost_12000/translog", 1, 1);
184 FileSystemWatchService watchService = new FileSystemWatchService(
185 "data/localhost_12000/filestore", defaultWatcher);
186 watchService.start();
187 Thread.sleep(10000);
188 watchService.stop();
189 }
190 }