1 package org.apache.helix.mock.spectator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.List;
23
24 import org.I0Itec.zkclient.IDefaultNameSpace;
25 import org.I0Itec.zkclient.ZkServer;
26 import org.apache.helix.HelixManager;
27 import org.apache.helix.HelixManagerFactory;
28 import org.apache.helix.InstanceType;
29 import org.apache.helix.ZNRecord;
30 import org.apache.helix.manager.zk.ZNRecordSerializer;
31 import org.apache.helix.model.InstanceConfig;
32 import org.apache.helix.spectator.RoutingTableProvider;
33 import org.apache.helix.tools.ClusterSetup;
34 import org.apache.helix.util.HelixUtil;
35
36
37
38
39
40
41
42 public class MockSpectatorProcess
43 {
44 private static final int port = 2188;
45 static long runId = System.currentTimeMillis();
46 private static final String dataDir = "/tmp/zkDataDir-" + runId;
47
48 private static final String logDir = "/tmp/zkLogDir-" + runId;
49
50 static String clusterName = "mock-cluster-" + runId;
51
52 static String zkConnectString = "localhost:2188";
53
54 private final RoutingTableProvider _routingTableProvider;
55 private static ZkServer zkServer;
56
57 public MockSpectatorProcess()
58 {
59 _routingTableProvider = new RoutingTableProvider();
60 }
61
62 public static void main(String[] args) throws Exception
63 {
64 setup();
65 zkServer.getZkClient().setZkSerializer(new ZNRecordSerializer());
66 ZNRecord record = zkServer.getZkClient().readData(
67 HelixUtil.getIdealStatePath(clusterName, "TestDB"));
68
69 String externalViewPath = HelixUtil.getExternalViewPath(clusterName, "TestDB");
70
71 MockSpectatorProcess process = new MockSpectatorProcess();
72 process.start();
73
74 process.routeRequest("TestDB", "TestDB_1");
75
76
77 zkServer.getZkClient().createPersistent(externalViewPath,record);
78
79 Thread.sleep(1000);
80 process.routeRequest("TestDB", "TestDB_1");
81 System.exit(1);
82 }
83
84 private static void setup()
85 {
86
87 IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
88 {
89 @Override
90 public void createDefaultNameSpace(org.I0Itec.zkclient.ZkClient client)
91 {
92 client.deleteRecursive("/" + clusterName);
93
94 }
95 };
96
97 zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
98 zkServer.start();
99 ClusterSetup clusterSetup = new ClusterSetup(zkConnectString);
100 clusterSetup.setupTestCluster(clusterName);
101 try
102 {
103 Thread.sleep(1000);
104 } catch (InterruptedException e)
105 {
106 e.printStackTrace();
107 }
108 }
109
110 public void routeRequest(String database, String partition)
111 {
112 List<InstanceConfig> masters;
113 List<InstanceConfig> slaves;
114 masters = _routingTableProvider.getInstances(database, partition, "MASTER");
115 if (masters != null && !masters.isEmpty())
116 {
117 System.out.println("Available masters to route request");
118 for (InstanceConfig config : masters)
119 {
120 System.out.println("HostName:" + config.getHostName() + " Port:"
121 + config.getPort());
122 }
123 } else
124 {
125 System.out.println("No masters available to route request");
126 }
127 slaves = _routingTableProvider.getInstances(database, partition, "SLAVE");
128 if (slaves != null && !slaves.isEmpty())
129 {
130 System.out.println("Available slaves to route request");
131 for (InstanceConfig config : slaves)
132 {
133 System.out.println("HostName:" + config.getHostName() + " Port:"
134 + config.getPort());
135 }
136 } else
137 {
138 System.out.println("No slaves available to route request");
139 }
140 }
141
142 public void start()
143 {
144
145 try
146 {
147 HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName,
148 null,
149 InstanceType.SPECTATOR,
150 zkConnectString);
151
152
153 manager.connect();
154 manager.addExternalViewChangeListener(_routingTableProvider);
155 } catch (Exception e)
156 {
157 e.printStackTrace();
158 }
159 }
160 }