1 package org.apache.helix.manager.zk;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 import org.I0Itec.zkclient.DataUpdater;
28 import org.I0Itec.zkclient.exception.ZkBadVersionException;
29 import org.I0Itec.zkclient.exception.ZkNoNodeException;
30 import org.apache.log4j.Logger;
31 import org.apache.zookeeper.data.Stat;
32
33 public class HelixGroupCommit<T>
34 {
35 private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
36
37 private static class Queue<T>
38 {
39 final AtomicReference<Thread> _running = new AtomicReference<Thread>();
40 final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
41 }
42
43 private static class Entry<T>
44 {
45 final String _key;
46 final DataUpdater<T> _updater;
47 AtomicBoolean _sent = new AtomicBoolean(false);
48
49 Entry(String key, DataUpdater<T> updater)
50 {
51 _key = key;
52 _updater = updater;
53 }
54 }
55
56 private final Queue<T>[] _queues = new Queue[100];
57
58 public HelixGroupCommit()
59 {
60
61 for (int i = 0; i < _queues.length; ++i)
62 {
63 _queues[i] = new Queue<T>();
64 }
65 }
66
67 private Queue<T> getQueue(String key)
68 {
69 return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
70 }
71
72 public boolean commit(ZkBaseDataAccessor<T> accessor,
73 int options,
74 String key,
75 DataUpdater<T> updater)
76 {
77 Queue<T> queue = getQueue(key);
78 Entry<T> entry = new Entry<T>(key, updater);
79
80 queue._pending.add(entry);
81
82 while (!entry._sent.get())
83 {
84 if (queue._running.compareAndSet(null, Thread.currentThread()))
85 {
86 ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
87 try
88 {
89 Entry<T> first = queue._pending.peek();
90 if (first == null)
91 {
92 return true;
93 }
94
95
96
97
98
99 String mergedKey = first._key;
100
101 boolean retry;
102 do
103 {
104 retry = false;
105
106 try
107 {
108 T merged = null;
109
110 Stat readStat = new Stat();
111 try
112 {
113
114 merged = accessor.get(mergedKey, readStat, options);
115 }
116 catch (ZkNoNodeException e)
117 {
118
119 }
120
121
122
123
124
125 Iterator<Entry<T>> it = processed.iterator();
126 while (it.hasNext())
127 {
128 Entry<T> ent = it.next();
129 if (!ent._key.equals(mergedKey))
130 {
131 continue;
132 }
133 merged = ent._updater.update(merged);
134
135 }
136
137
138 it = queue._pending.iterator();
139 while (it.hasNext())
140 {
141 Entry<T> ent = it.next();
142 if (!ent._key.equals(mergedKey))
143 {
144 continue;
145 }
146 processed.add(ent);
147 merged = ent._updater.update(merged);
148
149 it.remove();
150 }
151
152 accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
153 }
154 catch (ZkBadVersionException e)
155 {
156 retry = true;
157 }
158 }
159 while (retry);
160 }
161 finally
162 {
163 queue._running.set(null);
164 for (Entry<T> e : processed)
165 {
166 synchronized (e)
167 {
168 e._sent.set(true);
169 e.notify();
170 }
171 }
172 }
173 }
174 else
175 {
176 synchronized (entry)
177 {
178 try
179 {
180 entry.wait(10);
181 }
182 catch (InterruptedException e)
183 {
184 e.printStackTrace();
185 return false;
186 }
187 }
188 }
189 }
190 return true;
191 }
192 }