View Javadoc

1   package org.apache.helix.util;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Map;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import org.apache.helix.HelixException;
26  import org.apache.helix.manager.zk.ZNRecordSerializer;
27  import org.apache.helix.manager.zk.ZkClient;
28  import org.apache.zookeeper.ZooKeeper.States;
29  
30  
31  public class ZKClientPool
32  {
33    static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
34    static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
35  
36    public static ZkClient getZkClient(String zkServer)
37    {
38      // happy path that we cache the zkclient and it's still connected
39      if (_zkClientMap.containsKey(zkServer))
40      {
41        ZkClient zkClient = _zkClientMap.get(zkServer);
42        if (zkClient.getConnection().getZookeeperState() == States.CONNECTED)
43        {
44          return zkClient;
45        }
46      }
47  
48      synchronized (_zkClientMap)
49      {
50        // if we cache a stale zkclient, purge it
51        if (_zkClientMap.containsKey(zkServer))
52        {
53          ZkClient zkClient = _zkClientMap.get(zkServer);
54          if (zkClient.getConnection().getZookeeperState() != States.CONNECTED)
55          {
56            _zkClientMap.remove(zkServer);
57          }
58        }
59  
60        // get a new zkclient
61        if (!_zkClientMap.containsKey(zkServer))
62        {
63          ZkClient zkClient = new ZkClient(zkServer, DEFAULT_SESSION_TIMEOUT, 
64              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
65  
66          _zkClientMap.put(zkServer, zkClient);
67        }
68        return _zkClientMap.get(zkServer);
69      }
70    }
71  
72    public static void reset()
73    {
74      _zkClientMap.clear();
75    }
76    
77    public static void main(String[] args) throws InterruptedException
78    {
79      Thread /*_dataSampleThread = new Thread(new Runnable()
80      {
81        @Override
82        public void run()
83        {
84          int i = 0;
85          while(!Thread.currentThread().isInterrupted())
86          {
87            try
88            {
89              // if the queue is empty, sleep 100 ms and try again
90              Thread.sleep(1000);
91              System.out.println(i++ + "...");
92              throw new RuntimeException("" + i);
93            }
94            catch (InterruptedException e)
95            {
96              System.out.println("Collector thread interrupted" + e);
97              return;
98            }
99            catch(Throwable th)
100           {
101             System.out.println("Collector thread exception/ error" + th);
102           }
103         }
104       }
105     });
106     _dataSampleThread.start();
107     
108     Thread.sleep(10000);
109     _dataSampleThread.interrupt();
110     */
111     _dataSampleThread = new Thread(new Runnable()
112     {
113       @Override
114       public void run()
115       {
116         int i = 0;
117         while(!Thread.currentThread().isInterrupted())
118         {
119           
120             // if the queue is empty, sleep 100 ms and try again
121             try
122             {
123               Thread.sleep(1000);
124             } catch (InterruptedException e)
125             {
126               // TODO Auto-generated catch block
127               e.printStackTrace();
128             }
129             System.out.println(i++ + "...");
130             throw new Error("" + i);
131           
132         }
133       }
134     });
135     _dataSampleThread.start();
136     
137     Thread.sleep(10000);
138     _dataSampleThread.interrupt();
139   }
140 }