View Javadoc

1   package org.apache.helix.examples;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.File;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.TreeSet;
27  
28  import org.I0Itec.zkclient.IDefaultNameSpace;
29  import org.I0Itec.zkclient.ZkClient;
30  import org.I0Itec.zkclient.ZkServer;
31  import org.apache.helix.HelixAdmin;
32  import org.apache.helix.HelixManager;
33  import org.apache.helix.HelixManagerFactory;
34  import org.apache.helix.InstanceType;
35  import org.apache.helix.controller.HelixControllerMain;
36  import org.apache.helix.manager.zk.ZKHelixAdmin;
37  import org.apache.helix.manager.zk.ZNRecordSerializer;
38  import org.apache.helix.model.ExternalView;
39  import org.apache.helix.model.InstanceConfig;
40  import org.apache.helix.model.StateModelDefinition;
41  import org.apache.helix.participant.StateMachineEngine;
42  
43  public class Quickstart
44  {
45  
46    private static String ZK_ADDRESS = "localhost:2199";
47    private static String CLUSTER_NAME = "HELIX_QUICKSTART";
48    private static int NUM_NODES = 2;
49    private static final String RESOURCE_NAME = "MyResource";
50    private static final int NUM_PARTITIONS = 6;
51    private static final int NUM_REPLICAS = 2;
52  
53    private static final String STATE_MODEL_NAME = "MyStateModel";
54  
55    // states
56    private static final String SLAVE = "SLAVE";
57    private static final String OFFLINE = "OFFLINE";
58    private static final String MASTER = "MASTER";
59    private static final String DROPPED = "DROPPED";
60  
61    private static List<InstanceConfig> INSTANCE_CONFIG_LIST;
62    private static List<MyProcess> PROCESS_LIST;
63    private static HelixAdmin admin;
64    static
65    {
66      INSTANCE_CONFIG_LIST = new ArrayList<InstanceConfig>();
67      PROCESS_LIST = new ArrayList<Quickstart.MyProcess>();
68      for (int i = 0; i < NUM_NODES; i++)
69      {
70        int port = 12000 + i;
71        InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
72        instanceConfig.setHostName("localhost");
73        instanceConfig.setPort("" + port);
74        instanceConfig.setInstanceEnabled(true);
75        INSTANCE_CONFIG_LIST.add(instanceConfig);
76      }
77  
78    }
79  
80    public static void setup()
81    {
82      admin = new ZKHelixAdmin(ZK_ADDRESS);
83      // create cluster
84      echo("Creating cluster: " + CLUSTER_NAME);
85      admin.addCluster(CLUSTER_NAME, true);
86  
87      // Add nodes to the cluster
88      echo("Adding " + NUM_NODES + " participants to the cluster");
89      for (int i = 0; i < NUM_NODES; i++)
90      {
91        admin.addInstance(CLUSTER_NAME, INSTANCE_CONFIG_LIST.get(i));
92        echo("\t Added participant: "
93            + INSTANCE_CONFIG_LIST.get(i).getInstanceName());
94      }
95  
96      // Add a state model
97      StateModelDefinition myStateModel = defineStateModel();
98      echo("Configuring StateModel: " + "MyStateModel  with 1 Master and 1 Slave");
99      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, myStateModel);
100 
101     // Add a resource with 6 partitions and 2 replicas
102     echo("Adding a resource MyResource: " + "with 6 partitions and 2 replicas");
103     admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS,
104         STATE_MODEL_NAME, "AUTO");
105     // this will set up the ideal state, it calculates the preference list for
106     // each partition similar to consistent hashing
107     admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);
108   }
109 
110   private static StateModelDefinition defineStateModel()
111   {
112     StateModelDefinition.Builder builder = new StateModelDefinition.Builder(
113         STATE_MODEL_NAME);
114     // Add states and their rank to indicate priority. Lower the rank higher the
115     // priority
116     builder.addState(MASTER, 1);
117     builder.addState(SLAVE, 2);
118     builder.addState(OFFLINE);
119     builder.addState(DROPPED);
120     // Set the initial state when the node starts
121     builder.initialState(OFFLINE);
122 
123     // Add transitions between the states.
124     builder.addTransition(OFFLINE, SLAVE);
125     builder.addTransition(SLAVE, OFFLINE);
126     builder.addTransition(SLAVE, MASTER);
127     builder.addTransition(MASTER, SLAVE);
128     builder.addTransition(OFFLINE, DROPPED);
129 
130     // set constraints on states.
131     // static constraint
132     builder.upperBound(MASTER, 1);
133     // dynamic constraint, R means it should be derived based on the replication
134     // factor.
135     builder.dynamicUpperBound(SLAVE, "R");
136 
137     StateModelDefinition statemodelDefinition = builder.build();
138     return statemodelDefinition;
139   }
140 
141   public static void startController()
142   {
143     // start controller
144     echo("Starting Helix Controller");
145     HelixControllerMain.startHelixController(ZK_ADDRESS, CLUSTER_NAME,
146         "localhost_9100", HelixControllerMain.STANDALONE);
147   }
148 
149   public static void startNodes() throws Exception
150   {
151     echo("Starting Participants");
152     for (int i = 0; i < NUM_NODES; i++)
153     {
154       MyProcess process = new MyProcess(INSTANCE_CONFIG_LIST.get(i).getId());
155       PROCESS_LIST.add(process);
156       process.start();
157       echo("\t Started Participant: " + INSTANCE_CONFIG_LIST.get(i).getId());
158     }
159   }
160 
161   public static void startZookeeper()
162   {
163     echo("STARTING Zookeeper at " + ZK_ADDRESS);
164     IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
165     {
166       @Override
167       public void createDefaultNameSpace(ZkClient zkClient)
168       {
169       }
170     };
171     new File("/tmp/helix-quickstart").mkdirs();
172     // start zookeeper
173     ZkServer server = new ZkServer("/tmp/helix-quickstart/dataDir",
174         "/tmp/helix-quickstart/logDir", defaultNameSpace, 2199);
175     server.start();
176   }
177 
178   public static void echo(Object obj)
179   {
180     System.out.println(obj);
181   }
182 
183   public static void main(String[] args) throws Exception
184   {
185     startZookeeper();
186     setup();
187     startNodes();
188     startController();
189     Thread.sleep(5000);
190     printState("After starting 2 nodes");
191     addNode();
192     Thread.sleep(5000);
193     printState("After adding a third node");
194     stopNode();
195     Thread.sleep(5000);
196     printState("After the 3rd node stops/crashes");
197     Thread.currentThread().join();
198     System.exit(0);
199   }
200 
201   private static void addNode() throws Exception
202   {
203 
204     NUM_NODES = NUM_NODES + 1;
205     int port = 12000 + NUM_NODES - 1;
206     InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
207     instanceConfig.setHostName("localhost");
208     instanceConfig.setPort("" + port);
209     instanceConfig.setInstanceEnabled(true);
210     echo("ADDING NEW NODE :" + instanceConfig.getInstanceName()+ ". Partitions will move from old nodes to the new node.");
211     admin.addInstance(CLUSTER_NAME, instanceConfig);
212     INSTANCE_CONFIG_LIST.add(instanceConfig);
213     MyProcess process = new MyProcess(instanceConfig.getInstanceName());
214     PROCESS_LIST.add(process);
215     admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, 3);
216     process.start();
217   }
218 
219   private static void stopNode()
220   {
221     int nodeId = NUM_NODES - 1;
222     echo("STOPPING " + INSTANCE_CONFIG_LIST.get(nodeId).getInstanceName()+". Mastership will be transferred to the remaining nodes");
223     PROCESS_LIST.get(nodeId).stop();
224   }
225 
226   private static void printState(String msg)
227   {
228     System.out.println("CLUSTER STATE: "+ msg);
229     ExternalView resourceExternalView = admin.getResourceExternalView(
230         CLUSTER_NAME, RESOURCE_NAME);
231     TreeSet<String> sortedSet = new TreeSet<String>(
232         resourceExternalView.getPartitionSet());
233     StringBuilder sb = new StringBuilder("\t\t");
234     for (int i = 0; i < NUM_NODES; i++)
235     {
236       sb.append(INSTANCE_CONFIG_LIST.get(i).getInstanceName()).append("\t");
237     }
238     System.out.println(sb);
239     for (String partitionName : sortedSet)
240     {
241       sb.delete(0, sb.length() - 1);
242       sb.append(partitionName).append("\t");
243       for (int i = 0; i < NUM_NODES; i++)
244       {
245         Map<String, String> stateMap = resourceExternalView
246             .getStateMap(partitionName);
247         if (stateMap != null
248             && stateMap.containsKey(INSTANCE_CONFIG_LIST.get(i)
249                 .getInstanceName()))
250         {
251           sb.append(
252               stateMap.get(INSTANCE_CONFIG_LIST.get(i).getInstanceName())
253                   .charAt(0)).append("\t\t");
254         } else
255         {
256           sb.append("-").append("\t\t");
257         }
258       }
259       System.out.println(sb);
260     }
261     System.out.println("###################################################################");
262   }
263 
264   static final class MyProcess
265   {
266     private final String instanceName;
267     private HelixManager manager;
268 
269     public MyProcess(String instanceName)
270     {
271       this.instanceName = instanceName;
272     }
273 
274     public void start() throws Exception
275     {
276       manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
277           instanceName, InstanceType.PARTICIPANT, ZK_ADDRESS);
278 
279       MasterSlaveStateModelFactory stateModelFactory = new MasterSlaveStateModelFactory(
280           instanceName);
281 
282       StateMachineEngine stateMach = manager.getStateMachineEngine();
283       stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
284       manager.connect();
285     }
286 
287     public void stop()
288     {
289       manager.disconnect();
290     }
291   }
292 
293 }