View Javadoc

1   package org.apache.helix.mock.spectator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  
24  import org.I0Itec.zkclient.IDefaultNameSpace;
25  import org.I0Itec.zkclient.ZkServer;
26  import org.apache.helix.HelixManager;
27  import org.apache.helix.HelixManagerFactory;
28  import org.apache.helix.InstanceType;
29  import org.apache.helix.ZNRecord;
30  import org.apache.helix.manager.zk.ZNRecordSerializer;
31  import org.apache.helix.model.InstanceConfig;
32  import org.apache.helix.spectator.RoutingTableProvider;
33  import org.apache.helix.tools.ClusterSetup;
34  import org.apache.helix.util.HelixUtil;
35  
36  
37  /**
38   * A MockSpectatorProcess to demonstrate the integration with cluster manager.
39   * This uses Zookeeper in local mode and runs at port 2188
40   *
41   */
42  public class MockSpectatorProcess
43  {
44    private static final int port = 2188;
45    static long runId = System.currentTimeMillis();
46    private static final String dataDir = "/tmp/zkDataDir-" + runId;
47  
48    private static final String logDir = "/tmp/zkLogDir-" + runId;
49  
50    static String clusterName = "mock-cluster-" + runId;
51  
52    static String zkConnectString = "localhost:2188";
53  
54    private final RoutingTableProvider _routingTableProvider;
55    private static ZkServer zkServer;
56  
57    public MockSpectatorProcess()
58    {
59      _routingTableProvider = new RoutingTableProvider();
60    }
61  
62    public static void main(String[] args) throws Exception
63    {
64      setup();
65      zkServer.getZkClient().setZkSerializer(new ZNRecordSerializer());
66      ZNRecord record = zkServer.getZkClient().readData(
67          HelixUtil.getIdealStatePath(clusterName, "TestDB"));
68  
69      String externalViewPath = HelixUtil.getExternalViewPath(clusterName, "TestDB");
70  
71      MockSpectatorProcess process = new MockSpectatorProcess();
72      process.start();
73      //try to route, there is no master or slave available
74      process.routeRequest("TestDB", "TestDB_1");
75  
76      //update the externalview on zookeeper
77      zkServer.getZkClient().createPersistent(externalViewPath,record);
78      //sleep for sometime so that the ZK Callback is received.
79      Thread.sleep(1000);
80      process.routeRequest("TestDB", "TestDB_1");
81      System.exit(1);
82    }
83  
84    private static void setup()
85    {
86  
87      IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
88      {
89        @Override
90        public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient client)
91        {
92          client.deleteRecursive("/" + clusterName);
93  
94        }
95      };
96  
97      zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
98      zkServer.start();
99      ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
100     clusterSetup.setupTestCluster(clusterName);
101     try
102     {
103       Thread.sleep(1000);
104     } catch (InterruptedException e)
105     {
106       e.printStackTrace();
107     }
108   }
109 
110   public void routeRequest(String database, String partition)
111   {
112     List<InstanceConfig> masters;
113     List<InstanceConfig> slaves;
114     masters = _routingTableProvider.getInstances(database, partition, "MASTER");
115     if (masters != null && !masters.isEmpty())
116     {
117       System.out.println("Available masters to route request");
118       for (InstanceConfig config : masters)
119       {
120         System.out.println("HostName:" + config.getHostName() + " Port:"
121             + config.getPort());
122       }
123     } else
124     {
125       System.out.println("No masters available to route request");
126     }
127     slaves = _routingTableProvider.getInstances(database, partition, "SLAVE");
128     if (slaves != null && !slaves.isEmpty())
129     {
130       System.out.println("Available slaves to route request");
131       for (InstanceConfig config : slaves)
132       {
133         System.out.println("HostName:" + config.getHostName() + " Port:"
134             + config.getPort());
135       }
136     } else
137     {
138       System.out.println("No slaves available to route request");
139     }
140   }
141 
142   public void start()
143   {
144 
145     try
146     {
147       HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
148                                                                          null,
149                                                                          InstanceType.SPECTATOR,
150                                                                          zkConnectString);
151 
152 
153       manager.connect();
154       manager.addExternalViewChangeListener(_routingTableProvider);
155     } catch (Exception e)
156     {
157       e.printStackTrace();
158     }
159   }
160 }