1 package org.apache.helix.util;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import org.apache.helix.HelixException;
26 import org.apache.helix.manager.zk.ZNRecordSerializer;
27 import org.apache.helix.manager.zk.ZkClient;
28 import org.apache.zookeeper.ZooKeeper.States;
29
30
31 public class ZKClientPool
32 {
33 static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
34 static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
35
36 public static ZkClient getZkClient(String zkServer)
37 {
38 // happy path that we cache the zkclient and it's still connected
39 if (_zkClientMap.containsKey(zkServer))
40 {
41 ZkClient zkClient = _zkClientMap.get(zkServer);
42 if (zkClient.getConnection().getZookeeperState() == States.CONNECTED)
43 {
44 return zkClient;
45 }
46 }
47
48 synchronized (_zkClientMap)
49 {
50 // if we cache a stale zkclient, purge it
51 if (_zkClientMap.containsKey(zkServer))
52 {
53 ZkClient zkClient = _zkClientMap.get(zkServer);
54 if (zkClient.getConnection().getZookeeperState() != States.CONNECTED)
55 {
56 _zkClientMap.remove(zkServer);
57 }
58 }
59
60 // get a new zkclient
61 if (!_zkClientMap.containsKey(zkServer))
62 {
63 ZkClient zkClient = new ZkClient(zkServer, DEFAULT_SESSION_TIMEOUT,
64 ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
65
66 _zkClientMap.put(zkServer, zkClient);
67 }
68 return _zkClientMap.get(zkServer);
69 }
70 }
71
72 public static void reset()
73 {
74 _zkClientMap.clear();
75 }
76
77 public static void main(String[] args) throws InterruptedException
78 {
79 Thread /*_dataSampleThread = new Thread(new Runnable()
80 {
81 @Override
82 public void run()
83 {
84 int i = 0;
85 while(!Thread.currentThread().isInterrupted())
86 {
87 try
88 {
89 // if the queue is empty, sleep 100 ms and try again
90 Thread.sleep(1000);
91 System.out.println(i++ + "...");
92 throw new RuntimeException("" + i);
93 }
94 catch (InterruptedException e)
95 {
96 System.out.println("Collector thread interrupted" + e);
97 return;
98 }
99 catch(Throwable th)
100 {
101 System.out.println("Collector thread exception/ error" + th);
102 }
103 }
104 }
105 });
106 _dataSampleThread.start();
107
108 Thread.sleep(10000);
109 _dataSampleThread.interrupt();
110 */
111 _dataSampleThread = new Thread(new Runnable()
112 {
113 @Override
114 public void run()
115 {
116 int i = 0;
117 while(!Thread.currentThread().isInterrupted())
118 {
119
120 // if the queue is empty, sleep 100 ms and try again
121 try
122 {
123 Thread.sleep(1000);
124 } catch (InterruptedException e)
125 {
126 // TODO Auto-generated catch block
127 e.printStackTrace();
128 }
129 System.out.println(i++ + "...");
130 throw new Error("" + i);
131
132 }
133 }
134 });
135 _dataSampleThread.start();
136
137 Thread.sleep(10000);
138 _dataSampleThread.interrupt();
139 }
140 }