View Javadoc

1   package org.apache.helix.recipes.rabbitmq;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.helix.manager.zk.ZKHelixAdmin;
23  import org.apache.helix.manager.zk.ZNRecordSerializer;
24  import org.apache.helix.manager.zk.ZkClient;
25  import org.apache.helix.model.IdealState.IdealStateModeProperty;
26  import org.apache.helix.model.StateModelDefinition;
27  import org.apache.helix.tools.StateModelConfigGenerator;
28  
29  public class SetupConsumerCluster
30  {
31    public static final String DEFAULT_CLUSTER_NAME = "rabbitmq-consumer-cluster";
32    public static final String DEFAULT_RESOURCE_NAME = "topic";
33    public static final int DEFAULT_PARTITION_NUMBER = 6;
34    public static final String DEFAULT_STATE_MODEL = "OnlineOffline";
35  
36    public static void main(String[] args)
37    {
38      if (args.length < 1)
39      {
40        System.err.println("USAGE: java SetupConsumerCluster zookeeperAddress (e.g. localhost:2181)");
41        System.exit(1);
42      }
43  
44      final String zkAddr = args[0];
45      final String clusterName = DEFAULT_CLUSTER_NAME;
46  
47      ZkClient zkclient = null;
48      try
49      {
50        zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
51            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
52        ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
53  
54        // add cluster
55        admin.addCluster(clusterName, true);
56  
57        // add state model definition
58        StateModelConfigGenerator generator = new StateModelConfigGenerator();
59        admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
60            new StateModelDefinition(generator.generateConfigForOnlineOffline()));
61  
62        // add resource "topic" which has 6 partitions
63        String resourceName = DEFAULT_RESOURCE_NAME;
64        admin.addResource(clusterName, resourceName, DEFAULT_PARTITION_NUMBER, DEFAULT_STATE_MODEL,
65            IdealStateModeProperty.AUTO_REBALANCE.toString());
66        
67        admin.rebalance(clusterName, resourceName, 1);
68  
69      } finally
70      {
71        if (zkclient != null)
72        {
73          zkclient.close();
74        }
75      }
76    }
77  }