View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.apache.helix.NotificationContext;
26  import org.apache.helix.ZNRecord;
27  import org.apache.helix.model.ExternalView;
28  import org.apache.helix.model.InstanceConfig;
29  import org.apache.helix.spectator.RoutingTableProvider;
30  
31  public class Replicator extends RoutingTableProvider
32  {
33    private InstanceConfig currentMasterConfig;
34    private final InstanceConfig localInstanceConfig;
35  
36    private final String partition;
37    private final String resourceName;
38    AtomicBoolean isReplicationInitiated;
39    AtomicBoolean isReplicationStarted;
40    RsyncInvoker rsyncInvoker;
41    private ChangeLogProcessor processor;
42    private FileSystemWatchService watchService;
43    private ChangeLogReader reader;
44  
45    public void setRsyncInvoker(RsyncInvoker rsyncInvoker)
46    {
47      this.rsyncInvoker = rsyncInvoker;
48    }
49  
50    public Replicator(InstanceConfig localInstanceConfig, String resourceName,
51        String partition)
52    {
53      this.localInstanceConfig = localInstanceConfig;
54      this.resourceName = resourceName;
55      this.partition = partition;
56      isReplicationInitiated = new AtomicBoolean(false);
57      isReplicationStarted = new AtomicBoolean(false);
58  
59    }
60  
61    public void start() throws Exception
62    {
63  //System.out.println("Starting replication for ");
64      isReplicationInitiated.set(true);
65  
66      List<InstanceConfig> instances = getInstances(resourceName, partition,
67          "MASTER");
68      if (instances.size() > 0)
69      {
70        if (instances.size() == 1)
71        {
72          InstanceConfig newMasterConfig = instances.get(0);
73          String master = newMasterConfig.getInstanceName();
74          if (currentMasterConfig == null
75              || !master.equalsIgnoreCase(currentMasterConfig.getInstanceName()))
76          {
77            System.out.println("Found new master:"
78                + newMasterConfig.getInstanceName());
79            if(currentMasterConfig!=null){
80              stop();
81            }
82            currentMasterConfig = newMasterConfig;
83            startReplication(currentMasterConfig);
84          } else
85          {
86            System.out.println("Already replicating from " + master);
87          }
88        } else
89        {
90          System.out.println("Invalid number of masters found:" + instances);
91        }
92      } else
93      {
94        System.out.println("No master found");
95      }
96    }
97  
98    public void startReplication(InstanceConfig masterInstanceConfig)
99        throws Exception
100   {
101     String remoteHost = masterInstanceConfig.getHostName();
102     String remoteChangeLogDir = masterInstanceConfig.getRecord()
103         .getSimpleField("change_log_dir");
104     String remoteFilestoreDir = masterInstanceConfig.getRecord()
105         .getSimpleField("file_store_dir");
106 
107     String localChangeLogDir = localInstanceConfig.getRecord().getSimpleField(
108         "change_log_dir");
109     String localFilestoreDir = localInstanceConfig.getRecord().getSimpleField(
110         "file_store_dir");
111     String localcheckpointDir = localInstanceConfig.getRecord().getSimpleField(
112         "check_point_dir");
113     // setup rsync for the change log directory
114     setupRsync(remoteHost, remoteChangeLogDir, localChangeLogDir);
115     reader = new ChangeLogReader(localChangeLogDir);
116     watchService = new FileSystemWatchService(localChangeLogDir, reader);
117     processor = new ChangeLogProcessor(reader, remoteHost, remoteFilestoreDir,
118         localFilestoreDir, localcheckpointDir);
119     watchService.start();
120     processor.start();
121     isReplicationStarted.set(true);
122   }
123 
124   private void setupRsync(String remoteHost, String remoteBaseDir,
125       String localBaseDir) throws Exception
126   {
127     rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
128     boolean started = rsyncInvoker.runInBackground();
129     if (started)
130     {
131       System.out.println("Rsync thread started in background");
132     } else
133     {
134       throw new Exception("Unable to start rsync thread");
135     }
136   }
137 
138   public void stop()
139   {
140     if (isReplicationStarted.get())
141     {
142       System.out.println("Stopping replication from current master:"+ currentMasterConfig.getInstanceName());
143       rsyncInvoker.stop();
144       watchService.stop();
145       processor.stop();
146     }
147     isReplicationInitiated.set(false);
148   }
149 
150   @Override
151   public void onExternalViewChange(List<ExternalView> viewList,
152       NotificationContext context)
153   {
154     super.onExternalViewChange(viewList, context);
155 
156     if (isReplicationInitiated.get())
157     {
158       try
159       {
160         start();
161       } catch (Exception e)
162       {
163         e.printStackTrace();
164       }
165     }
166   }
167 
168   public static void main(String[] args) throws Exception
169   {
170     InstanceConfig localInstanceConfig = new InstanceConfig("localhost_12001");
171     ZNRecord record = localInstanceConfig.getRecord();
172     record.setSimpleField("change_log_dir", "data/localhost_12001/translog");
173     record.setSimpleField("file_store_dir", "data/localhost_12001/filestore");
174     record.setSimpleField("check_point_dir", "data/localhost_12001/checkpoint");
175     InstanceConfig masterInstanceConfig = new InstanceConfig("localhost_12001");
176     record = masterInstanceConfig.getRecord();
177     record.setSimpleField("change_log_dir", "data/localhost_12000/translog");
178     record.setSimpleField("file_store_dir", "data/localhost_12000/filestore");
179     record.setSimpleField("check_point_dir", "data/localhost_12000/checkpoint");
180     Replicator replicator = new Replicator(localInstanceConfig, "resource",
181         "partition");
182     replicator.startReplication(masterInstanceConfig);
183   }
184 
185 }