1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23
24 import org.apache.helix.HelixManager;
25 import org.apache.helix.HelixManagerFactory;
26 import org.apache.helix.InstanceType;
27 import org.apache.helix.manager.zk.ZKHelixAdmin;
28 import org.apache.helix.manager.zk.ZNRecordSerializer;
29 import org.apache.helix.manager.zk.ZkClient;
30 import org.apache.helix.model.InstanceConfig;
31 import org.apache.helix.participant.StateMachineEngine;
32
33 public class FileStore
34 {
35 private final String _zkAddr;
36 private final String _clusterName;
37 private final String _serverId;
38 private HelixManager _manager = null;
39
40 public FileStore(String zkAddr, String clusterName, String serverId)
41 {
42 _zkAddr = zkAddr;
43 _clusterName = clusterName;
44 _serverId = serverId;
45 }
46
47 public void connect()
48 {
49 try
50 {
51 _manager =
52 HelixManagerFactory.getZKHelixManager(_clusterName,
53 _serverId,
54 InstanceType.PARTICIPANT,
55 _zkAddr);
56
57 StateMachineEngine stateMach = _manager.getStateMachineEngine();
58 FileStoreStateModelFactory modelFactory =
59 new FileStoreStateModelFactory(_manager);
60 stateMach.registerStateModelFactory(SetupCluster.DEFAULT_STATE_MODEL, modelFactory);
61 _manager.connect();
62
63 Thread.currentThread().join();
64 }
65 catch (InterruptedException e)
66 {
67 System.err.println(" [-] " + _serverId + " is interrupted ...");
68 }
69 catch (Exception e)
70 {
71 e.printStackTrace();
72 }
73 finally
74 {
75 disconnect();
76 }
77 }
78
79 public void disconnect()
80 {
81 if (_manager != null)
82 {
83 _manager.disconnect();
84 }
85 }
86
87 public static void main(String[] args) throws Exception
88 {
89 if (args.length < 2)
90 {
91 System.err.println("USAGE: java FileStore zookeeperAddress(e.g. localhost:2181) serverId(host_port)");
92 System.exit(1);
93 }
94
95 final String zkAddr = args[0];
96 final String clusterName = SetupCluster.DEFAULT_CLUSTER_NAME;
97 final String serverId = args[1];
98
99 ZkClient zkclient = null;
100 try
101 {
102
103 final FileStore store =
104 new FileStore(zkAddr, clusterName, serverId);
105
106 Runtime.getRuntime().addShutdownHook(new Thread()
107 {
108 @Override
109 public void run()
110 {
111 System.out.println("Shutting down server:" + serverId);
112 store.disconnect();
113 }
114 });
115 store.connect();
116 }
117 finally
118 {
119 if (zkclient != null)
120 {
121 zkclient.close();
122 }
123 }
124 }
125 }