View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.Arrays;
25  import java.util.HashMap;
26  import java.util.Map;
27  
28  import org.I0Itec.zkclient.IDefaultNameSpace;
29  import org.I0Itec.zkclient.ZkClient;
30  import org.I0Itec.zkclient.ZkServer;
31  import org.apache.commons.io.FileUtils;
32  import org.apache.helix.HelixDataAccessor;
33  import org.apache.helix.HelixManager;
34  import org.apache.helix.PropertyKey.Builder;
35  import org.apache.helix.controller.HelixControllerMain;
36  import org.apache.helix.model.HelixConfigScope;
37  import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
38  import org.apache.helix.model.builder.HelixConfigScopeBuilder;
39  import org.apache.helix.tools.ClusterSetup;
40  
41  public class IntegrationTest
42  {
43  
44    public static void main(String[] args) throws InterruptedException
45    {
46      ZkServer server = null;
47      ;
48  
49      try
50      {
51        String baseDir = "/tmp/IntegrationTest/";
52        final String dataDir = baseDir + "zk/dataDir";
53        final String logDir = baseDir + "/tmp/logDir";
54        FileUtils.deleteDirectory(new File(dataDir));
55        FileUtils.deleteDirectory(new File(logDir));
56  
57        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
58        {
59          @Override
60          public void createDefaultNameSpace(ZkClient zkClient)
61          {
62  
63          }
64        };
65        int zkPort = 2199;
66        final String zkAddress = "localhost:" + zkPort;
67  
68        server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
69        server.start();
70        ClusterSetup setup = new ClusterSetup(zkAddress);
71        final String clusterName = "file-store-test";
72        setup.deleteCluster(clusterName);
73        setup.addCluster(clusterName, true);
74        setup.addInstanceToCluster(clusterName, "localhost_12001");
75        setup.addInstanceToCluster(clusterName, "localhost_12002");
76        setup.addInstanceToCluster(clusterName, "localhost_12003");
77        setup.addResourceToCluster(clusterName, "repository", 1, "MasterSlave");
78        setup.rebalanceResource(clusterName, "repository", 3);
79        // Set the configuration
80        final String instanceName1 = "localhost_12001";
81        addConfiguration(setup, baseDir, clusterName, instanceName1);
82        final String instanceName2 = "localhost_12002";
83        addConfiguration(setup, baseDir, clusterName, instanceName2);
84        final String instanceName3 = "localhost_12003";
85        addConfiguration(setup, baseDir, clusterName, instanceName3);
86        Thread thread1 = new Thread(new Runnable()
87        {
88          @Override
89          public void run()
90          {
91            FileStore fileStore = null;
92  
93            try
94            {
95              fileStore = new FileStore(zkAddress, clusterName, instanceName1);
96              fileStore.connect();
97            } catch (Exception e)
98            {
99              System.err.println("Exception" + e);
100             fileStore.disconnect();
101           }
102         }
103 
104       });
105       // START NODES
106       Thread thread2 = new Thread(new Runnable()
107       {
108 
109         @Override
110         public void run()
111         {
112           FileStore fileStore = new FileStore(zkAddress, clusterName,
113               instanceName2);
114           fileStore.connect();
115         }
116       });
117       // START NODES
118       Thread thread3 = new Thread(new Runnable()
119       {
120 
121         @Override
122         public void run()
123         {
124           FileStore fileStore = new FileStore(zkAddress, clusterName,
125               instanceName3);
126           fileStore.connect();
127         }
128       });
129       System.out.println("STARTING NODES");
130       thread1.start();
131       thread2.start();
132       thread3.start();
133 
134       // Start Controller
135       final HelixManager manager = HelixControllerMain.startHelixController(
136           zkAddress, clusterName, "controller", HelixControllerMain.STANDALONE);
137       Thread.sleep(5000);
138       printStatus(manager);
139       listFiles(baseDir);
140       System.out.println("Writing files a.txt and b.txt to current master "
141           + baseDir + "localhost_12001" + "/filestore");
142       FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
143           + "/filestore/a.txt"), "some_data in a");
144       FileUtils.writeStringToFile(new File(baseDir + "localhost_12001"
145           + "/filestore/b.txt"), "some_data in b");
146       Thread.sleep(10000);
147       listFiles(baseDir);
148       Thread.sleep(5000);
149       System.out.println("Stopping the MASTER node:" + "localhost_12001");
150       thread1.interrupt();
151       Thread.sleep(10000);
152       printStatus(manager);
153       System.out.println("Writing files c.txt and d.txt to current master "
154           + baseDir + "localhost_12002" + "/filestore");
155       FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
156           + "/filestore/c.txt"), "some_data in c");
157       FileUtils.writeStringToFile(new File(baseDir + "localhost_12002"
158           + "/filestore/d.txt"), "some_data in d");
159       Thread.sleep(10000);
160       listFiles(baseDir);
161       System.out.println("Create or modify any files under " + baseDir
162           + "localhost_12002" + "/filestore"
163           + " and it should get replicated to " + baseDir + "localhost_12003"
164           + "/filestore");
165     } catch (Exception e)
166     {
167       e.printStackTrace();
168     } finally
169     {
170       if (server != null)
171       {
172         // server.shutdown();
173       }
174     }
175     Thread.currentThread().join();
176   }
177 
178   private static void listFiles(String baseDir)
179   {
180     System.out.println("===============FILES===============================");
181     String[] instances = new String[] { "localhost_12001", "localhost_12002",
182         "localhost_12003" };
183     for (String instance : instances)
184     {
185       String dir = baseDir + instance + "/filestore";
186       String[] list = new File(dir).list();
187       System.out.println(dir + ":"
188           + ((list != null) ? Arrays.toString(list) : "NONE"));
189     }
190     System.out.println("===============FILES===============================");
191   }
192 
193   private static void printStatus(final HelixManager manager)
194   {
195     System.out.println("CLUSTER STATUS");
196     HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
197     Builder keyBuilder = helixDataAccessor.keyBuilder();
198     System.out.println("External View \n"
199         + helixDataAccessor.getProperty(keyBuilder.externalView("repository")));
200   }
201 
202   private static void addConfiguration(ClusterSetup setup, String baseDir,
203       String clusterName, String instanceName) throws IOException
204   {
205     Map<String, String> properties = new HashMap<String, String>();
206     HelixConfigScopeBuilder builder = new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT);
207     HelixConfigScope instanceScope = builder.forCluster(clusterName)
208         .forParticipant(instanceName).build();
209     properties.put("change_log_dir", baseDir + instanceName + "/translog");
210     properties.put("file_store_dir", baseDir + instanceName + "/filestore");
211     properties.put("check_point_dir", baseDir + instanceName + "/checkpoint");
212     setup.getClusterManagementTool().setConfig(instanceScope, properties);
213     FileUtils.deleteDirectory(new File(properties.get("change_log_dir")));
214     FileUtils.deleteDirectory(new File(properties.get("file_store_dir")));
215     FileUtils.deleteDirectory(new File(properties.get("check_point_dir")));
216     new File(properties.get("change_log_dir")).mkdirs();
217     new File(properties.get("file_store_dir")).mkdirs();
218     new File(properties.get("check_point_dir")).mkdirs();
219   }
220 }