1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.io.RandomAccessFile;
26 import java.nio.charset.Charset;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.locks.Condition;
31 import java.util.concurrent.locks.Lock;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 public class ChangeLogReader implements FileChangeWatcher
35 {
36 int MAX_ENTRIES_TO_READ = 100;
37 private final String changeLogDir;
38 Lock lock;
39 private Condition condition;
40
41 public ChangeLogReader(String changeLogDir)
42 {
43 this.changeLogDir = changeLogDir;
44 lock = new ReentrantLock();
45 condition = lock.newCondition();
46
47 }
48
49
50
51
52
53
54
55 public List<ChangeRecord> getChangeSince(ChangeRecord record)
56 {
57 List<ChangeRecord> changes = new ArrayList<ChangeRecord>();
58 String fileName;
59 long endOffset;
60 if (record == null)
61 {
62 fileName = "log.1";
63 endOffset = 0;
64 } else
65 {
66 fileName = record.changeLogFileName;
67 endOffset = record.endOffset;
68 }
69 try
70 {
71 lock.lock();
72
73 File file;
74 file = new File(changeLogDir + "/" + fileName);
75 while (!file.exists() || file.length() <= endOffset)
76 {
77
78 try
79 {
80 System.out.println("Waiting for new changes");
81 condition.await();
82 System.out.println("Detected changes");
83 } catch (InterruptedException e)
84 {
85 e.printStackTrace();
86 }
87 }
88 RandomAccessFile raf = new RandomAccessFile(
89 changeLogDir + "/" + fileName, "r");
90 raf.seek(endOffset);
91
92
93
94
95
96 int count = 0;
97 do {
98 ChangeRecord newRecord = new ChangeRecord();
99 newRecord.changeLogFileName = fileName;
100 newRecord.startOffset = raf.getFilePointer();
101 newRecord.txid = raf.readLong();
102 newRecord.type = raf.readShort();
103 newRecord.timestamp = raf.readLong();
104 newRecord.file = raf.readUTF();
105 newRecord.endOffset = raf.getFilePointer();
106 changes.add(newRecord);
107 count++;
108 }while (count < MAX_ENTRIES_TO_READ && raf.getFilePointer()< raf.length());
109 } catch (FileNotFoundException e)
110 {
111 e.printStackTrace();
112 } catch (IOException e)
113 {
114 e.printStackTrace();
115 } finally
116 {
117 lock.unlock();
118 }
119 return changes;
120 }
121
122 @Override
123 public void onEntryModified(String path)
124 {
125 try
126 {
127 lock.lock();
128 condition.signalAll();
129 } catch (Exception e)
130 {
131
132 } finally
133 {
134 lock.unlock();
135 }
136 }
137
138 @Override
139 public void onEntryAdded(String path)
140 {
141 try
142 {
143 lock.lock();
144 condition.signalAll();
145 } catch (Exception e)
146 {
147
148 } finally
149 {
150 lock.unlock();
151 }
152 }
153
154 @Override
155 public void onEntryDeleted(String path)
156 {
157 try
158 {
159 lock.lock();
160 condition.signalAll();
161 } catch (Exception e)
162 {
163
164 } finally
165 {
166 lock.unlock();
167 }
168 }
169 }