View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.io.RandomAccessFile;
26  import java.nio.charset.Charset;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.List;
30  import java.util.concurrent.locks.Condition;
31  import java.util.concurrent.locks.Lock;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  public class ChangeLogReader implements FileChangeWatcher
35  {
36    int MAX_ENTRIES_TO_READ = 100;
37    private final String changeLogDir;
38    Lock lock;
39    private Condition condition;
40  
41    public ChangeLogReader(String changeLogDir)
42    {
43      this.changeLogDir = changeLogDir;
44      lock = new ReentrantLock();
45      condition = lock.newCondition();
46  
47    }
48  
49    /**
50     * Blocking call
51     * 
52     * @param record
53     * @return
54     */
55    public List<ChangeRecord> getChangeSince(ChangeRecord record)
56    {
57      List<ChangeRecord> changes = new ArrayList<ChangeRecord>();
58      String fileName;
59      long endOffset;
60      if (record == null)
61      {
62        fileName = "log.1";
63        endOffset = 0;
64      } else
65      {
66        fileName = record.changeLogFileName;
67        endOffset = record.endOffset;
68      }
69      try
70      {
71        lock.lock();
72        
73        File file;
74        file = new File(changeLogDir + "/" + fileName);
75        while (!file.exists() || file.length() <= endOffset)
76        {
77       // wait
78          try
79          {
80            System.out.println("Waiting for new changes");
81            condition.await();
82            System.out.println("Detected changes");
83          } catch (InterruptedException e)
84          {
85            e.printStackTrace();
86          }
87        }
88        RandomAccessFile raf = new RandomAccessFile(
89            changeLogDir + "/" + fileName, "r");
90        raf.seek(endOffset);
91        // out.writeLong(record.txid);
92        // out.writeShort(record.type);
93        // out.writeLong(record.timestamp);
94        // out.writeUTF(record.file);
95  
96        int count = 0;
97       do {
98          ChangeRecord newRecord = new ChangeRecord();
99          newRecord.changeLogFileName = fileName;
100         newRecord.startOffset = raf.getFilePointer();
101         newRecord.txid = raf.readLong();
102         newRecord.type = raf.readShort();
103         newRecord.timestamp = raf.readLong();
104         newRecord.file = raf.readUTF();
105         newRecord.endOffset = raf.getFilePointer();
106         changes.add(newRecord);
107         count++;
108       }while (count < MAX_ENTRIES_TO_READ && raf.getFilePointer()< raf.length());
109     } catch (FileNotFoundException e)
110     {
111       e.printStackTrace();
112     } catch (IOException e)
113     {
114       e.printStackTrace();
115     } finally
116     {
117       lock.unlock();
118     }
119     return changes;
120   }
121 
122   @Override
123   public void onEntryModified(String path)
124   {
125     try
126     {
127       lock.lock();
128       condition.signalAll();
129     } catch (Exception e)
130     {
131       // TODO: handle exception
132     } finally
133     {
134       lock.unlock();
135     }
136   }
137 
138   @Override
139   public void onEntryAdded(String path)
140   {
141     try
142     {
143       lock.lock();
144       condition.signalAll();
145     } catch (Exception e)
146     {
147       // TODO: handle exception
148     } finally
149     {
150       lock.unlock();
151     }
152   }
153 
154   @Override
155   public void onEntryDeleted(String path)
156   {
157     try
158     {
159       lock.lock();
160       condition.signalAll();
161     } catch (Exception e)
162     {
163       // TODO: handle exception
164     } finally
165     {
166       lock.unlock();
167     }
168   }
169 }