View Javadoc

1   package org.apache.helix.filestore;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  
24  import org.apache.helix.HelixManager;
25  import org.apache.helix.HelixManagerFactory;
26  import org.apache.helix.InstanceType;
27  import org.apache.helix.manager.zk.ZKHelixAdmin;
28  import org.apache.helix.manager.zk.ZNRecordSerializer;
29  import org.apache.helix.manager.zk.ZkClient;
30  import org.apache.helix.model.InstanceConfig;
31  import org.apache.helix.participant.StateMachineEngine;
32  
33  public class FileStore
34  {
35    private final String _zkAddr;
36    private final String _clusterName;
37    private final String _serverId;
38    private HelixManager _manager = null;
39  
40    public FileStore(String zkAddr, String clusterName, String serverId)
41    {
42      _zkAddr = zkAddr;
43      _clusterName = clusterName;
44      _serverId = serverId;
45    }
46  
47    public void connect()
48    {
49      try
50      {
51        _manager =
52            HelixManagerFactory.getZKHelixManager(_clusterName,
53                                                  _serverId,
54                                                  InstanceType.PARTICIPANT,
55                                                  _zkAddr);
56  
57        StateMachineEngine stateMach = _manager.getStateMachineEngine();
58        FileStoreStateModelFactory modelFactory =
59            new FileStoreStateModelFactory(_manager);
60        stateMach.registerStateModelFactory(SetupCluster.DEFAULT_STATE_MODEL, modelFactory);
61        _manager.connect();
62  //      _manager.addExternalViewChangeListener(replicator);
63        Thread.currentThread().join();
64      }
65      catch (InterruptedException e)
66      {
67        System.err.println(" [-] " + _serverId + " is interrupted ...");
68      }
69      catch (Exception e)
70      {
71        e.printStackTrace();
72      }
73      finally
74      {
75        disconnect();
76      }
77    }
78  
79    public void disconnect()
80    {
81      if (_manager != null)
82      {
83        _manager.disconnect();
84      }
85    }
86  
87    public static void main(String[] args) throws Exception
88    {
89      if (args.length < 2)
90      {
91        System.err.println("USAGE: java FileStore zookeeperAddress(e.g. localhost:2181) serverId(host_port)");
92        System.exit(1);
93      }
94  
95      final String zkAddr = args[0];
96      final String clusterName = SetupCluster.DEFAULT_CLUSTER_NAME;
97      final String serverId = args[1];
98  
99      ZkClient zkclient = null;
100     try
101     {
102       // start consumer
103       final FileStore store =
104           new FileStore(zkAddr, clusterName, serverId);
105 
106       Runtime.getRuntime().addShutdownHook(new Thread()
107       {
108         @Override
109         public void run()
110         {
111           System.out.println("Shutting down server:" + serverId);
112           store.disconnect();
113         }
114       });
115       store.connect();
116     }
117     finally
118     {
119       if (zkclient != null)
120       {
121         zkclient.close();
122       }
123     }
124   }
125 }