View Javadoc

1   package org.apache.helix.lockmanager;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.util.Map;
24  import java.util.TreeSet;
25  
26  import org.I0Itec.zkclient.IDefaultNameSpace;
27  import org.I0Itec.zkclient.ZkClient;
28  import org.I0Itec.zkclient.ZkServer;
29  import org.apache.commons.io.FileUtils;
30  import org.apache.helix.HelixAdmin;
31  import org.apache.helix.HelixManager;
32  import org.apache.helix.controller.HelixControllerMain;
33  import org.apache.helix.manager.zk.ZKHelixAdmin;
34  import org.apache.helix.model.ExternalView;
35  import org.apache.helix.model.StateModelDefinition;
36  import org.apache.helix.model.IdealState.IdealStateModeProperty;
37  import org.apache.helix.tools.StateModelConfigGenerator;
38  
39  public class LockManagerDemo
40  {
41    /**
42     * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
43     * 
44     * @param args
45     * @throws Exception
46     */
47    public static void main(String[] args) throws Exception
48    {
49      final String zkAddress = "localhost:2199";
50      final String clusterName = "lock-manager-demo";
51      final String lockGroupName = "lock-group";
52      final int numInstances = 3;
53      final int numPartitions = 12;
54      final boolean startController = false;
55      HelixManager controllerManager = null;
56      Thread[] processArray;
57      processArray = new Thread[numInstances];
58      try
59      {
60        startLocalZookeeper(2199);
61        HelixAdmin admin = new ZKHelixAdmin(zkAddress);
62        admin.addCluster(clusterName, true);
63        StateModelConfigGenerator generator = new StateModelConfigGenerator();
64        admin.addStateModelDef(clusterName, "OnlineOffline",
65            new StateModelDefinition(generator.generateConfigForOnlineOffline()));
66        admin.addResource(clusterName, lockGroupName, numPartitions,
67            "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE.toString());
68        admin.rebalance(clusterName, lockGroupName, 1);
69        for (int i = 0; i < numInstances; i++)
70        {
71          final String instanceName = "localhost_" + (12000 + i);
72          processArray[i] = new Thread(new Runnable()
73          {
74  
75            @Override
76            public void run()
77            {
78              LockProcess lockProcess = null;
79  
80              try
81              {
82                lockProcess = new LockProcess(clusterName, zkAddress,
83                    instanceName, startController);
84                lockProcess.start();
85                Thread.currentThread().join();
86              } catch (InterruptedException e)
87              {
88                System.out.println(instanceName + "Interrupted");
89                if (lockProcess != null)
90                {
91                  lockProcess.stop();
92                }
93              } catch (Exception e)
94              {
95                e.printStackTrace();
96              }
97            }
98  
99          });
100         processArray[i].start();
101       }
102       Thread.sleep(3000);
103       controllerManager = HelixControllerMain.startHelixController(zkAddress,
104           clusterName, "controller", HelixControllerMain.STANDALONE);
105       Thread.sleep(5000);
106       printStatus(admin, clusterName, lockGroupName);
107       System.out.println("Stopping localhost_12000");
108       processArray[0].interrupt();
109       Thread.sleep(3000);
110       printStatus(admin, clusterName, lockGroupName);
111       Thread.currentThread().join();
112     } catch (Exception e)
113     {
114       e.printStackTrace();
115     } finally
116     {
117       if (controllerManager != null)
118       {
119         controllerManager.disconnect();
120       }
121       for (Thread process : processArray)
122       {
123         if (process != null)
124         {
125           process.interrupt();
126         }
127       }
128     }
129   }
130 
131   private static void printStatus(HelixAdmin admin, String cluster,
132       String resource)
133   {
134     ExternalView externalView = admin
135         .getResourceExternalView(cluster, resource);
136     //System.out.println(externalView);
137     TreeSet<String> treeSet = new TreeSet<String>(
138         externalView.getPartitionSet());
139     System.out.println("lockName" + "\t" + "acquired By");
140     System.out.println("======================================");
141     for (String lockName : treeSet)
142     {
143       Map<String, String> stateMap = externalView.getStateMap(lockName);
144       String acquiredBy = null;
145       if (stateMap != null)
146       {
147         for(String instanceName:stateMap.keySet()){
148           if ("ONLINE".equals(stateMap.get(instanceName))){
149             acquiredBy = instanceName;
150             break;
151           }
152         }
153       }
154       System.out.println(lockName + "\t"
155           + ((acquiredBy != null) ? acquiredBy : "NONE"));
156     }
157   }
158 
159   private static void startLocalZookeeper(int port) throws Exception
160   {
161     ZkServer server = null;
162     String baseDir = "/tmp/IntegrationTest/";
163     final String dataDir = baseDir + "zk/dataDir";
164     final String logDir = baseDir + "/tmp/logDir";
165     FileUtils.deleteDirectory(new File(dataDir));
166     FileUtils.deleteDirectory(new File(logDir));
167 
168     IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
169     {
170       @Override
171       public void createDefaultNameSpace(ZkClient zkClient)
172       {
173 
174       }
175     };
176     int zkPort = 2199;
177     final String zkAddress = "localhost:" + zkPort;
178 
179     server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
180     server.start();
181 
182   }
183 
184 }