1 package org.apache.helix;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import org.I0Itec.zkclient.exception.ZkNoNodeException;
29 import org.apache.log4j.Logger;
30
31 // TODO: move to mananger.zk
32 public class GroupCommit
33 {
34 private static Logger LOG = Logger.getLogger(GroupCommit.class);
35 private static class Queue
36 {
37 final AtomicReference<Thread> _running = new AtomicReference<Thread>();
38 final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>();
39 }
40
41 private static class Entry
42 {
43 final String _key;
44 final ZNRecord _record;
45 AtomicBoolean _sent = new AtomicBoolean(false);
46
47 Entry(String key, ZNRecord record)
48 {
49 _key = key;
50 _record = record;
51 }
52 }
53
54 private final Queue[] _queues = new Queue[100];
55
56 // potential memory leak if we add resource and remove resource
57 // TODO: move the cache logic to data accessor
58 // private final Map<String, ZNRecord> _cache = new ConcurrentHashMap<String, ZNRecord>();
59
60
61 public GroupCommit()
62 {
63 // Don't use Arrays.fill();
64 for (int i = 0; i < _queues.length; ++i)
65 {
66 _queues[i] = new Queue();
67 }
68 }
69
70 private Queue getQueue(String key)
71 {
72 return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
73 }
74
75 public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key, ZNRecord record)
76 {
77 Queue queue = getQueue(key);
78 Entry entry = new Entry(key, record);
79
80 queue._pending.add(entry);
81
82 while (!entry._sent.get())
83 {
84 if (queue._running.compareAndSet(null, Thread.currentThread()))
85 {
86 ArrayList<Entry> processed = new ArrayList<Entry>();
87 try
88 {
89 if (queue._pending.peek() == null)
90 return true;
91
92 // remove from queue
93 Entry first = queue._pending.poll();
94 processed.add(first);
95
96 String mergedKey = first._key;
97 // ZNRecord merged = _cache.get(mergedKey);
98 ZNRecord merged = null;
99
100 try
101 {
102 // accessor will fallback to zk if not found in cache
103 merged = accessor.get(mergedKey, null, options);
104 }
105 catch (ZkNoNodeException e)
106 {
107 // OK.
108 }
109
110 /**
111 * If the local cache does not contain a value, need to check if there is a
112 * value in ZK; use it as initial value if exists
113 */
114 if (merged == null)
115 {
116 // ZNRecord valueOnZk = null;
117 // try
118 // {
119 // valueOnZk = accessor.get(mergedKey, null, 0);
120 // }
121 // catch(Exception e)
122 // {
123 // LOG.info(e);
124 // }
125 // if(valueOnZk != null)
126 // {
127 // merged = valueOnZk;
128 // merged.merge(first._record);
129 // }
130 // else // Zk path has null data. use the first record as initial record.
131 {
132 merged = new ZNRecord(first._record);
133 }
134 }
135 else
136 {
137 merged.merge(first._record);
138 }
139 Iterator<Entry> it = queue._pending.iterator();
140 while (it.hasNext())
141 {
142 Entry ent = it.next();
143 if (!ent._key.equals(mergedKey))
144 continue;
145 processed.add(ent);
146 merged.merge(ent._record);
147 // System.out.println("After merging:" + merged);
148 it.remove();
149 }
150 // System.out.println("size:"+ processed.size());
151 accessor.set(mergedKey, merged, options);
152 // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
153 // _cache.put(mergedKey, merged);
154 }
155 finally
156 {
157 queue._running.set(null);
158 for (Entry e : processed)
159 {
160 synchronized (e)
161 {
162 e._sent.set(true);
163 e.notify();
164 }
165 }
166 }
167 }
168 else
169 {
170 synchronized (entry)
171 {
172 try
173 {
174 entry.wait(10);
175 }
176 catch (InterruptedException e)
177 {
178 e.printStackTrace();
179 return false;
180 }
181 }
182 }
183 }
184 return true;
185 }
186
187 }