1 package org.apache.helix.lockmanager;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.util.Map;
24 import java.util.TreeSet;
25
26 import org.I0Itec.zkclient.IDefaultNameSpace;
27 import org.I0Itec.zkclient.ZkClient;
28 import org.I0Itec.zkclient.ZkServer;
29 import org.apache.commons.io.FileUtils;
30 import org.apache.helix.HelixAdmin;
31 import org.apache.helix.HelixManager;
32 import org.apache.helix.controller.HelixControllerMain;
33 import org.apache.helix.manager.zk.ZKHelixAdmin;
34 import org.apache.helix.model.ExternalView;
35 import org.apache.helix.model.StateModelDefinition;
36 import org.apache.helix.model.IdealState.IdealStateModeProperty;
37 import org.apache.helix.tools.StateModelConfigGenerator;
38
39 public class LockManagerDemo
40 {
41
42
43
44
45
46
47 public static void main(String[] args) throws Exception
48 {
49 final String zkAddress = "localhost:2199";
50 final String clusterName = "lock-manager-demo";
51 final String lockGroupName = "lock-group";
52 final int numInstances = 3;
53 final int numPartitions = 12;
54 final boolean startController = false;
55 HelixManager controllerManager = null;
56 Thread[] processArray;
57 processArray = new Thread[numInstances];
58 try
59 {
60 startLocalZookeeper(2199);
61 HelixAdmin admin = new ZKHelixAdmin(zkAddress);
62 admin.addCluster(clusterName, true);
63 StateModelConfigGenerator generator = new StateModelConfigGenerator();
64 admin.addStateModelDef(clusterName, "OnlineOffline",
65 new StateModelDefinition(generator.generateConfigForOnlineOffline()));
66 admin.addResource(clusterName, lockGroupName, numPartitions,
67 "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE.toString());
68 admin.rebalance(clusterName, lockGroupName, 1);
69 for (int i = 0; i < numInstances; i++)
70 {
71 final String instanceName = "localhost_" + (12000 + i);
72 processArray[i] = new Thread(new Runnable()
73 {
74
75 @Override
76 public void run()
77 {
78 LockProcess lockProcess = null;
79
80 try
81 {
82 lockProcess = new LockProcess(clusterName, zkAddress,
83 instanceName, startController);
84 lockProcess.start();
85 Thread.currentThread().join();
86 } catch (InterruptedException e)
87 {
88 System.out.println(instanceName + "Interrupted");
89 if (lockProcess != null)
90 {
91 lockProcess.stop();
92 }
93 } catch (Exception e)
94 {
95 e.printStackTrace();
96 }
97 }
98
99 });
100 processArray[i].start();
101 }
102 Thread.sleep(3000);
103 controllerManager = HelixControllerMain.startHelixController(zkAddress,
104 clusterName, "controller", HelixControllerMain.STANDALONE);
105 Thread.sleep(5000);
106 printStatus(admin, clusterName, lockGroupName);
107 System.out.println("Stopping localhost_12000");
108 processArray[0].interrupt();
109 Thread.sleep(3000);
110 printStatus(admin, clusterName, lockGroupName);
111 Thread.currentThread().join();
112 } catch (Exception e)
113 {
114 e.printStackTrace();
115 } finally
116 {
117 if (controllerManager != null)
118 {
119 controllerManager.disconnect();
120 }
121 for (Thread process : processArray)
122 {
123 if (process != null)
124 {
125 process.interrupt();
126 }
127 }
128 }
129 }
130
131 private static void printStatus(HelixAdmin admin, String cluster,
132 String resource)
133 {
134 ExternalView externalView = admin
135 .getResourceExternalView(cluster, resource);
136
137 TreeSet<String> treeSet = new TreeSet<String>(
138 externalView.getPartitionSet());
139 System.out.println("lockName" + "\t" + "acquired By");
140 System.out.println("======================================");
141 for (String lockName : treeSet)
142 {
143 Map<String, String> stateMap = externalView.getStateMap(lockName);
144 String acquiredBy = null;
145 if (stateMap != null)
146 {
147 for(String instanceName:stateMap.keySet()){
148 if ("ONLINE".equals(stateMap.get(instanceName))){
149 acquiredBy = instanceName;
150 break;
151 }
152 }
153 }
154 System.out.println(lockName + "\t"
155 + ((acquiredBy != null) ? acquiredBy : "NONE"));
156 }
157 }
158
159 private static void startLocalZookeeper(int port) throws Exception
160 {
161 ZkServer server = null;
162 String baseDir = "/tmp/IntegrationTest/";
163 final String dataDir = baseDir + "zk/dataDir";
164 final String logDir = baseDir + "/tmp/logDir";
165 FileUtils.deleteDirectory(new File(dataDir));
166 FileUtils.deleteDirectory(new File(logDir));
167
168 IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
169 {
170 @Override
171 public void createDefaultNameSpace(ZkClient zkClient)
172 {
173
174 }
175 };
176 int zkPort = 2199;
177 final String zkAddress = "localhost:" + zkPort;
178
179 server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
180 server.start();
181
182 }
183
184 }