1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedOutputStream;
23 import java.io.DataOutputStream;
24 import java.io.File;
25 import java.io.FileNotFoundException;
26 import java.io.FileOutputStream;
27 import java.nio.charset.Charset;
28 import java.util.concurrent.locks.Lock;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.helix.filestore.FileSystemWatchService.ChangeType;
32
33 public class ChangeLogGenerator implements FileChangeWatcher
34 {
35 Lock lock;
36 private long currentSeq;
37 private long currentGen;
38 private int entriesLogged;
39 private DataOutputStream out;
40 private final String directory;
41
42 public ChangeLogGenerator(String directory, long startGen, long startSeq)
43 throws Exception
44 {
45 this.directory = directory;
46 lock = new ReentrantLock();
47 currentSeq = startSeq;
48 currentGen = startGen;
49 setLogFile();
50 }
51
52 private void setLogFile() throws Exception
53 {
54 File file = new File(directory);
55 String[] list = file.list();
56 if(list==null){
57 list = new String[]{};
58 }
59 int max = 1;
60 for (String name : list)
61 {
62 String[] split = name.split("\\.");
63 if (split.length == 2)
64 {
65 try
66 {
67 int index = Integer.parseInt(split[1]);
68 if (index > max)
69 {
70 max = index;
71 }
72 } catch (NumberFormatException e)
73 {
74 System.err.println("Invalid transaction log file found:" + name);
75 }
76 }
77 }
78
79 String transLogFile = directory+"/"+
80 "log." + (max);
81 System.out.println("Current file name:"+ transLogFile);
82 out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(transLogFile,true)));
83 }
84
85 @Override
86 public void onEntryAdded(String path)
87 {
88 appendChange(path, FileSystemWatchService.ChangeType.CREATE);
89
90 }
91
92 @Override
93 public void onEntryDeleted(String path)
94 {
95 appendChange(path, FileSystemWatchService.ChangeType.DELETE);
96
97 }
98
99 @Override
100 public void onEntryModified(String path)
101 {
102
103 appendChange(path, FileSystemWatchService.ChangeType.MODIFY);
104
105 }
106
107 public boolean appendChange(String path, ChangeType type)
108 {
109 lock.lock();
110 if(new File(path).isDirectory()){
111 return true;
112 }
113 try
114 {
115 ChangeRecord record = new ChangeRecord();
116 record.file = path;
117 record.timestamp = System.currentTimeMillis();
118 currentSeq++;
119 long txnId = (((long)currentGen) << 32) + ((long)currentSeq);
120 record.txid = txnId;
121 record.type = (short) type.ordinal();
122 write(record);
123 } catch (Exception e)
124 {
125 e.printStackTrace();
126 return false;
127 } finally
128 {
129 lock.unlock();
130 }
131 return true;
132 }
133
134 private void write(ChangeRecord record) throws Exception
135 {
136 out.writeLong(record.txid);
137 out.writeShort(record.type);
138 out.writeLong(record.timestamp);
139 out.writeUTF(record.file);
140 out.flush();
141 entriesLogged++;
142 if(entriesLogged==10000){
143 entriesLogged=0;
144 out.close();
145 setLogFile();
146 }
147 }
148
149 }