View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.File;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.nio.charset.Charset;
28  import java.util.concurrent.locks.Lock;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.helix.filestore.FileSystemWatchService.ChangeType;
32  
33  public class ChangeLogGenerator implements FileChangeWatcher
34  {
35    Lock lock;
36    private long currentSeq;
37    private long currentGen;
38    private int entriesLogged;
39    private DataOutputStream out;
40    private final String directory;
41  
42    public ChangeLogGenerator(String directory, long startGen, long startSeq)
43        throws Exception
44    {
45      this.directory = directory;
46      lock = new ReentrantLock();
47      currentSeq = startSeq;
48      currentGen = startGen;
49      setLogFile();
50    }
51  
52    private void setLogFile() throws Exception
53    {
54      File file = new File(directory);
55      String[] list = file.list();
56      if(list==null){
57        list = new String[]{};
58      }
59      int max = 1;
60      for (String name : list)
61      {
62        String[] split = name.split("\\.");
63        if (split.length == 2)
64        {
65          try
66          {
67            int index = Integer.parseInt(split[1]);
68            if (index > max)
69            {
70              max = index;
71            }
72          } catch (NumberFormatException e)
73          {
74            System.err.println("Invalid transaction log file found:" + name);
75          }
76        }
77      }
78      
79      String transLogFile = directory+"/"+
80          "log." + (max);
81      System.out.println("Current file name:"+ transLogFile);
82      out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(transLogFile,true)));
83    }
84  
85    @Override
86    public void onEntryAdded(String path)
87    {
88      appendChange(path, FileSystemWatchService.ChangeType.CREATE);
89  
90    }
91  
92    @Override
93    public void onEntryDeleted(String path)
94    {
95      appendChange(path, FileSystemWatchService.ChangeType.DELETE);
96  
97    }
98  
99    @Override
100   public void onEntryModified(String path)
101   {
102 
103     appendChange(path, FileSystemWatchService.ChangeType.MODIFY);
104 
105   }
106 
107   public boolean appendChange(String path, ChangeType type)
108   {
109     lock.lock();
110     if(new File(path).isDirectory()){
111       return true;
112     }
113     try
114     {
115       ChangeRecord record = new ChangeRecord();
116       record.file = path;
117       record.timestamp = System.currentTimeMillis();
118       currentSeq++;
119       long txnId = (((long)currentGen) << 32) + ((long)currentSeq);
120       record.txid = txnId;
121       record.type = (short) type.ordinal();
122       write(record);
123     } catch (Exception e)
124     {
125       e.printStackTrace();
126       return false;
127     } finally
128     {
129       lock.unlock();
130     }
131     return true;
132   }
133 
134   private void write(ChangeRecord record) throws Exception
135   {
136     out.writeLong(record.txid);
137     out.writeShort(record.type);
138     out.writeLong(record.timestamp);
139     out.writeUTF(record.file);
140     out.flush();
141     entriesLogged++;
142     if(entriesLogged==10000){
143       entriesLogged=0;
144       out.close();
145       setLogFile();
146     }
147   }
148 
149 }