1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.apache.helix.NotificationContext;
26 import org.apache.helix.ZNRecord;
27 import org.apache.helix.model.ExternalView;
28 import org.apache.helix.model.InstanceConfig;
29 import org.apache.helix.spectator.RoutingTableProvider;
30
31 public class Replicator extends RoutingTableProvider
32 {
33 private InstanceConfig currentMasterConfig;
34 private final InstanceConfig localInstanceConfig;
35
36 private final String partition;
37 private final String resourceName;
38 AtomicBoolean isReplicationInitiated;
39 AtomicBoolean isReplicationStarted;
40 RsyncInvoker rsyncInvoker;
41 private ChangeLogProcessor processor;
42 private FileSystemWatchService watchService;
43 private ChangeLogReader reader;
44
45 public void setRsyncInvoker(RsyncInvoker rsyncInvoker)
46 {
47 this.rsyncInvoker = rsyncInvoker;
48 }
49
50 public Replicator(InstanceConfig localInstanceConfig, String resourceName,
51 String partition)
52 {
53 this.localInstanceConfig = localInstanceConfig;
54 this.resourceName = resourceName;
55 this.partition = partition;
56 isReplicationInitiated = new AtomicBoolean(false);
57 isReplicationStarted = new AtomicBoolean(false);
58
59 }
60
61 public void start() throws Exception
62 {
63
64 isReplicationInitiated.set(true);
65
66 List<InstanceConfig> instances = getInstances(resourceName, partition,
67 "MASTER");
68 if (instances.size() > 0)
69 {
70 if (instances.size() == 1)
71 {
72 InstanceConfig newMasterConfig = instances.get(0);
73 String master = newMasterConfig.getInstanceName();
74 if (currentMasterConfig == null
75 || !master.equalsIgnoreCase(currentMasterConfig.getInstanceName()))
76 {
77 System.out.println("Found new master:"
78 + newMasterConfig.getInstanceName());
79 if(currentMasterConfig!=null){
80 stop();
81 }
82 currentMasterConfig = newMasterConfig;
83 startReplication(currentMasterConfig);
84 } else
85 {
86 System.out.println("Already replicating from " + master);
87 }
88 } else
89 {
90 System.out.println("Invalid number of masters found:" + instances);
91 }
92 } else
93 {
94 System.out.println("No master found");
95 }
96 }
97
98 public void startReplication(InstanceConfig masterInstanceConfig)
99 throws Exception
100 {
101 String remoteHost = masterInstanceConfig.getHostName();
102 String remoteChangeLogDir = masterInstanceConfig.getRecord()
103 .getSimpleField("change_log_dir");
104 String remoteFilestoreDir = masterInstanceConfig.getRecord()
105 .getSimpleField("file_store_dir");
106
107 String localChangeLogDir = localInstanceConfig.getRecord().getSimpleField(
108 "change_log_dir");
109 String localFilestoreDir = localInstanceConfig.getRecord().getSimpleField(
110 "file_store_dir");
111 String localcheckpointDir = localInstanceConfig.getRecord().getSimpleField(
112 "check_point_dir");
113
114 setupRsync(remoteHost, remoteChangeLogDir, localChangeLogDir);
115 reader = new ChangeLogReader(localChangeLogDir);
116 watchService = new FileSystemWatchService(localChangeLogDir, reader);
117 processor = new ChangeLogProcessor(reader, remoteHost, remoteFilestoreDir,
118 localFilestoreDir, localcheckpointDir);
119 watchService.start();
120 processor.start();
121 isReplicationStarted.set(true);
122 }
123
124 private void setupRsync(String remoteHost, String remoteBaseDir,
125 String localBaseDir) throws Exception
126 {
127 rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
128 boolean started = rsyncInvoker.runInBackground();
129 if (started)
130 {
131 System.out.println("Rsync thread started in background");
132 } else
133 {
134 throw new Exception("Unable to start rsync thread");
135 }
136 }
137
138 public void stop()
139 {
140 if (isReplicationStarted.get())
141 {
142 System.out.println("Stopping replication from current master:"+ currentMasterConfig.getInstanceName());
143 rsyncInvoker.stop();
144 watchService.stop();
145 processor.stop();
146 }
147 isReplicationInitiated.set(false);
148 }
149
150 @Override
151 public void onExternalViewChange(List<ExternalView> viewList,
152 NotificationContext context)
153 {
154 super.onExternalViewChange(viewList, context);
155
156 if (isReplicationInitiated.get())
157 {
158 try
159 {
160 start();
161 } catch (Exception e)
162 {
163 e.printStackTrace();
164 }
165 }
166 }
167
168 public static void main(String[] args) throws Exception
169 {
170 InstanceConfig localInstanceConfig = new InstanceConfig("localhost_12001");
171 ZNRecord record = localInstanceConfig.getRecord();
172 record.setSimpleField("change_log_dir", "data/localhost_12001/translog");
173 record.setSimpleField("file_store_dir", "data/localhost_12001/filestore");
174 record.setSimpleField("check_point_dir", "data/localhost_12001/checkpoint");
175 InstanceConfig masterInstanceConfig = new InstanceConfig("localhost_12001");
176 record = masterInstanceConfig.getRecord();
177 record.setSimpleField("change_log_dir", "data/localhost_12000/translog");
178 record.setSimpleField("file_store_dir", "data/localhost_12000/filestore");
179 record.setSimpleField("check_point_dir", "data/localhost_12000/checkpoint");
180 Replicator replicator = new Replicator(localInstanceConfig, "resource",
181 "partition");
182 replicator.startReplication(masterInstanceConfig);
183 }
184
185 }