View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.util.Collection;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.apache.commons.jci.listeners.AbstractFilesystemAlterationListener;
27  import org.apache.commons.jci.monitor.FilesystemAlterationMonitor;
28  
29  public class FileSystemWatchService
30  {
31    enum ChangeType
32    {
33      CREATE, DELETE, MODIFY
34    };
35  
36    private FilesystemAlterationMonitor fam;
37    private MyFilesystemAlterationListener listener;
38    private Thread thread;
39  
40    public FileSystemWatchService(String root, FileChangeWatcher... watchers)
41    {
42      this(root, -1, watchers);
43    }
44  
45    public FileSystemWatchService(String root, long startTime,
46        FileChangeWatcher... watchers)
47    {
48      File file = new File(root);
49      System.out.println("Setting up watch service for path:"
50          + file.getAbsolutePath());
51      fam = new FilesystemAlterationMonitor();
52      listener = new MyFilesystemAlterationListener(root, startTime, watchers);
53      fam.addListener(file, listener);
54    }
55  
56    public void start()
57    {
58      fam.start();
59      thread = new Thread(listener);
60      thread.start();
61    }
62  
63    static class MyFilesystemAlterationListener extends
64        AbstractFilesystemAlterationListener implements Runnable
65    {
66      private final FileChangeWatcher[] watchers;
67      private int length;
68      private final long startTime;
69      private AtomicBoolean stopRequest;
70  
71      public MyFilesystemAlterationListener(String root, long startTime,
72          FileChangeWatcher[] watchers)
73      {
74        this.startTime = startTime;
75        File file = new File(root);
76        length = root.length() + 1;
77        this.watchers = watchers;
78        stopRequest = new AtomicBoolean(false);
79        
80      }
81  
82      @SuppressWarnings("unchecked")
83      public void run()
84      {
85        while (!stopRequest.get())
86        {
87          try
88          {
89            waitForCheck();
90            process(getCreatedDirectories(), watchers, ChangeType.CREATE);
91            process(getDeletedDirectories(), watchers, ChangeType.DELETE);
92            process(getChangedDirectories(), watchers, ChangeType.MODIFY);
93            process(getCreatedFiles(), watchers, ChangeType.CREATE);
94            process(getDeletedFiles(), watchers, ChangeType.DELETE);
95            process(getChangedFiles(), watchers, ChangeType.MODIFY);
96  
97          } catch (Exception e)
98          {
99            e.printStackTrace();
100         }
101       }
102     }
103 
104     private void process(Collection<File> files, FileChangeWatcher[] watchers,
105         ChangeType type)
106     {
107       if (files.size() > 0)
108       {
109         for (File file : files)
110         {
111           if (file.lastModified() < startTime)
112           {
113             continue;
114           }
115           String path = file.getPath();
116           String relativePath = ".";
117           if (path.length() > length)
118           {
119             relativePath = path.substring(length);
120           }
121           for (FileChangeWatcher watcher : watchers)
122           {
123             switch (type)
124             {
125             case CREATE:
126               watcher.onEntryAdded(relativePath);
127               break;
128             case DELETE:
129               watcher.onEntryDeleted(relativePath);
130               break;
131             case MODIFY:
132               watcher.onEntryModified(relativePath);
133               break;
134             }
135           }
136         }
137       }
138     }
139 
140     public void stop()
141     {
142       stopRequest.set(true);
143     }
144   }
145 
146   public void stop()
147   {
148     listener.stop();
149     fam.stop();
150     thread.interrupt();
151   }
152 
153   public static void main(String[] args) throws Exception
154   {
155     FileChangeWatcher defaultWatcher = new FileChangeWatcher()
156     {
157 
158       @Override
159       public void onEntryModified(String path)
160       {
161         System.out
162             .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryModified():"
163                 + path);
164       }
165 
166       @Override
167       public void onEntryDeleted(String path)
168       {
169         System.out
170             .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryDeleted():"
171                 + path);
172       }
173 
174       @Override
175       public void onEntryAdded(String path)
176       {
177         System.out
178             .println("FileSystemWatchService.main(...).new FileChangeWatcher() {...}.onEntryAdded() : "
179                 + path);
180       }
181     };
182     ChangeLogGenerator ChangeLogGenerator = new ChangeLogGenerator(
183         "data/localhost_12000/translog", 1, 1);
184     FileSystemWatchService watchService = new FileSystemWatchService(
185         "data/localhost_12000/filestore", defaultWatcher);
186     watchService.start();
187     Thread.sleep(10000);
188     watchService.stop();
189   }
190 }