1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.Map;
27
28 import org.I0Itec.zkclient.IDefaultNameSpace;
29 import org.I0Itec.zkclient.ZkClient;
30 import org.I0Itec.zkclient.ZkServer;
31 import org.apache.commons.io.FileUtils;
32 import org.apache.helix.HelixDataAccessor;
33 import org.apache.helix.HelixManager;
34 import org.apache.helix.PropertyKey.Builder;
35 import org.apache.helix.controller.HelixControllerMain;
36 import org.apache.helix.model.HelixConfigScope;
37 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
38 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
39 import org.apache.helix.tools.ClusterSetup;
40
41 public class IntegrationTest
42 {
43
44 public static void main(String[] args) throws InterruptedException
45 {
46 ZkServer server = null;
47 ;
48
49 try
50 {
51 String baseDir = "/tmp/IntegrationTest/";
52 final String dataDir = baseDir + "zk/dataDir";
53 final String logDir = baseDir + "/tmp/logDir";
54 FileUtils.deleteDirectory(new File(dataDir));
55 FileUtils.deleteDirectory(new File(logDir));
56
57 IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
58 {
59 @Override
60 public void createDefaultNameSpace(ZkClient zkClient)
61 {
62
63 }
64 };
65 int zkPort = 2199;
66 final String zkAddress = "localhost:" + zkPort;
67
68 server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
69 server.start();
70 ClusterSetup setup = new ClusterSetup(zkAddress);
71 final String clusterName = "file-store-test";
72 setup.deleteCluster(clusterName);
73 setup.addCluster(clusterName, true);
74 setup.addInstanceToCluster(clusterName, "localhost_12001");
75 setup.addInstanceToCluster(clusterName, "localhost_12002");
76 setup.addInstanceToCluster(clusterName, "localhost_12003");
77 setup.addResourceToCluster(clusterName, "repository", 1, "MasterSlave");
78 setup.rebalanceResource(clusterName, "repository", 3);
79
80 final String instanceName1 = "localhost_12001";
81 addConfiguration(setup, baseDir, clusterName, instanceName1);
82 final String instanceName2 = "localhost_12002";
83 addConfiguration(setup, baseDir, clusterName, instanceName2);
84 final String instanceName3 = "localhost_12003";
85 addConfiguration(setup, baseDir, clusterName, instanceName3);
86 Thread thread1 = new Thread(new Runnable()
87 {
88 @Override
89 public void run()
90 {
91 FileStore fileStore = null;
92
93 try
94 {
95 fileStore = new FileStore(zkAddress, clusterName, instanceName1);
96 fileStore.connect();
97 } catch (Exception e)
98 {
99 System.err.println("Exception" + e);
100 fileStore.disconnect();
101 }
102 }
103
104 });
105
106 Thread thread2 = new Thread(new Runnable()
107 {
108
109 @Override
110 public void run()
111 {
112 FileStore fileStore = new FileStore(zkAddress, clusterName,
113 instanceName2);
114 fileStore.connect();
115 }
116 });
117
118 Thread thread3 = new Thread(new Runnable()
119 {
120
121 @Override
122 public void run()
123 {
124 FileStore fileStore = new FileStore(zkAddress, clusterName,
125 instanceName3);
126 fileStore.connect();
127 }
128 });
129 System.out.println("STARTING NODES");
130 thread1.start();
131 thread2.start();
132 thread3.start();
133
134
135 final HelixManager manager = HelixControllerMain.startHelixController(
136 zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
137 Thread.sleep(5000);
138 printStatus(manager);
139 listFiles(baseDir);
140 System.out.println("Writing files a.txt and b.txt to current master "
141 + baseDir + "localhost_12001" + "/filestore");
142 FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
143 + "/filestore/a.txt"), "some_data in a");
144 FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
145 + "/filestore/b.txt"), "some_data in b");
146 Thread.sleep(10000);
147 listFiles(baseDir);
148 Thread.sleep(5000);
149 System.out.println("Stopping the MASTER node:" + "localhost_12001");
150 thread1.interrupt();
151 Thread.sleep(10000);
152 printStatus(manager);
153 System.out.println("Writing files c.txt and d.txt to current master "
154 + baseDir + "localhost_12002" + "/filestore");
155 FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
156 + "/filestore/c.txt"), "some_data in c");
157 FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
158 + "/filestore/d.txt"), "some_data in d");
159 Thread.sleep(10000);
160 listFiles(baseDir);
161 System.out.println("Create or modify any files under " + baseDir
162 + "localhost_12002" + "/filestore"
163 + " and it should get replicated to " + baseDir + "localhost_12003"
164 + "/filestore");
165 } catch (Exception e)
166 {
167 e.printStackTrace();
168 } finally
169 {
170 if (server != null)
171 {
172
173 }
174 }
175 Thread.currentThread().join();
176 }
177
178 private static void listFiles(String baseDir)
179 {
180 System.out.println("===============FILES===============================");
181 String[] instances = new String[] { "localhost_12001", "localhost_12002",
182 "localhost_12003" };
183 for (String instance : instances)
184 {
185 String dir = baseDir + instance + "/filestore";
186 String[] list = new File(dir).list();
187 System.out.println(dir + ":"
188 + ((list != null) ? Arrays.toString(list) : "NONE"));
189 }
190 System.out.println("===============FILES===============================");
191 }
192
193 private static void printStatus(final HelixManager manager)
194 {
195 System.out.println("CLUSTER STATUS");
196 HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
197 Builder keyBuilder = helixDataAccessor.keyBuilder();
198 System.out.println("External View \n"
199 + helixDataAccessor.getProperty(keyBuilder.externalView("repository")));
200 }
201
202 private static void addConfiguration(ClusterSetup setup, String baseDir,
203 String clusterName, String instanceName) throws IOException
204 {
205 Map<String, String> properties = new HashMap<String, String>();
206 HelixConfigScopeBuilder builder = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT);
207 HelixConfigScope instanceScope = builder.forCluster(clusterName)
208 .forParticipant(instanceName).build();
209 properties.put("change_log_dir", baseDir + instanceName + "/translog");
210 properties.put("file_store_dir", baseDir + instanceName + "/filestore");
211 properties.put("check_point_dir", baseDir + instanceName + "/checkpoint");
212 setup.getClusterManagementTool().setConfig(instanceScope, properties);
213 FileUtils.deleteDirectory(new File(properties.get("change_log_dir")));
214 FileUtils.deleteDirectory(new File(properties.get("file_store_dir")));
215 FileUtils.deleteDirectory(new File(properties.get("check_point_dir")));
216 new File(properties.get("change_log_dir")).mkdirs();
217 new File(properties.get("file_store_dir")).mkdirs();
218 new File(properties.get("check_point_dir")).mkdirs();
219 }
220 }