1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.I0Itec.zkclient.exception.ZkInterruptedException;
27 import org.apache.log4j.Logger;
28
29
30 public class ZkCacheEventThread extends Thread
31 {
32
33 private static final Logger LOG =
34 Logger.getLogger(ZkCacheEventThread.class);
35 private final BlockingQueue<ZkCacheEvent> _events = new LinkedBlockingQueue<ZkCacheEvent>();
36 private static AtomicInteger _eventId = new AtomicInteger(0);
37
38 static abstract class ZkCacheEvent
39 {
40
41 private final String _description;
42
43 public ZkCacheEvent(String description)
44 {
45 _description = description;
46 }
47
48 public abstract void run() throws Exception;
49
50 @Override
51 public String toString()
52 {
53 return "ZkCacheEvent[" + _description + "]";
54 }
55 }
56
57 ZkCacheEventThread(String name)
58 {
59 setDaemon(true);
60 setName("ZkCache-EventThread-" + getId() + "-" + name);
61 }
62
63 @Override
64 public void run()
65 {
66 LOG.info("Starting ZkCache event thread.");
67 try
68 {
69 while (!isInterrupted())
70 {
71 ZkCacheEvent zkEvent = _events.take();
72 int eventId = _eventId.incrementAndGet();
73 LOG.debug("Delivering event #" + eventId + " " + zkEvent);
74 try
75 {
76 zkEvent.run();
77 }
78 catch (InterruptedException e)
79 {
80 interrupt();
81 }
82 catch (ZkInterruptedException e)
83 {
84 interrupt();
85 }
86 catch (Throwable e)
87 {
88 LOG.error("Error handling event " + zkEvent, e);
89 }
90 LOG.debug("Delivering event #" + eventId + " done");
91 }
92 }
93 catch (InterruptedException e)
94 {
95 LOG.info("Terminate ZkClient event thread.");
96 }
97 }
98
99 public void send(ZkCacheEvent event)
100 {
101 if (!isInterrupted())
102 {
103 LOG.debug("New event: " + event);
104 _events.add(event);
105 }
106 }
107 }