View Javadoc

1   package org.apache.helix.manager.zk;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  import org.I0Itec.zkclient.DataUpdater;
28  import org.I0Itec.zkclient.exception.ZkBadVersionException;
29  import org.I0Itec.zkclient.exception.ZkNoNodeException;
30  import org.apache.log4j.Logger;
31  import org.apache.zookeeper.data.Stat;
32  
33  public class HelixGroupCommit<T>
34  {
35    private static Logger LOG = Logger.getLogger(HelixGroupCommit.class);
36  
37    private static class Queue<T>
38    {
39      final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
40      final ConcurrentLinkedQueue<Entry<T>> _pending = new ConcurrentLinkedQueue<Entry<T>>();
41    }
42  
43    private static class Entry<T>
44    {
45      final String         _key;
46      final DataUpdater<T> _updater;
47      AtomicBoolean        _sent = new AtomicBoolean(false);
48  
49      Entry(String key, DataUpdater<T> updater)
50      {
51        _key = key;
52        _updater = updater;
53      }
54    }
55  
56    private final Queue<T>[] _queues = new Queue[100];
57  
58    public HelixGroupCommit()
59    {
60      // Don't use Arrays.fill();
61      for (int i = 0; i < _queues.length; ++i)
62      {
63        _queues[i] = new Queue<T>();
64      }
65    }
66  
67    private Queue<T> getQueue(String key)
68    {
69      return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
70    }
71  
72    public boolean commit(ZkBaseDataAccessor<T> accessor,
73                          int options,
74                          String key,
75                          DataUpdater<T> updater)
76    {
77      Queue<T> queue = getQueue(key);
78      Entry<T> entry = new Entry<T>(key, updater);
79  
80      queue._pending.add(entry);
81  
82      while (!entry._sent.get())
83      {
84        if (queue._running.compareAndSet(null, Thread.currentThread()))
85        {
86          ArrayList<Entry<T>> processed = new ArrayList<Entry<T>>();
87          try
88          {
89            Entry<T> first = queue._pending.peek();
90            if (first == null)
91            {
92              return true;
93            }
94  
95            // remove from queue
96            // Entry first = queue._pending.poll();
97            // processed.add(first);
98  
99            String mergedKey = first._key;
100 
101           boolean retry;
102           do
103           {
104             retry = false;
105 
106             try
107             {
108               T merged = null;
109 
110               Stat readStat = new Stat();
111               try
112               {
113                 // accessor will fallback to zk if not found in cache
114                 merged = accessor.get(mergedKey, readStat, options);
115               }
116               catch (ZkNoNodeException e)
117               {
118                 // OK.
119               }
120 
121               // updater should handler merged == null
122               // merged = first._updater.update(merged);
123 
124               // iterate over processed if we are retrying
125               Iterator<Entry<T>> it = processed.iterator();
126               while (it.hasNext())
127               {
128                 Entry<T> ent = it.next();
129                 if (!ent._key.equals(mergedKey))
130                 {
131                   continue;
132                 }
133                 merged = ent._updater.update(merged);
134                 // System.out.println("After merging:" + merged);
135               }
136 
137               // iterate over queue._pending for newly coming requests
138               it = queue._pending.iterator();
139               while (it.hasNext())
140               {
141                 Entry<T> ent = it.next();
142                 if (!ent._key.equals(mergedKey))
143                 {
144                   continue;
145                 }
146                 processed.add(ent);
147                 merged = ent._updater.update(merged);
148                 // System.out.println("After merging:" + merged);
149                 it.remove();
150               }
151               // System.out.println("size:"+ processed.size());
152               accessor.set(mergedKey, merged, null, null, readStat.getVersion(), options);
153             }
154             catch (ZkBadVersionException e)
155             {
156               retry = true;
157             }
158           }
159           while (retry);
160         }
161         finally
162         {
163           queue._running.set(null);
164           for (Entry<T> e : processed)
165           {
166             synchronized (e)
167             {
168               e._sent.set(true);
169               e.notify();
170             }
171           }
172         }
173       }
174       else
175       {
176         synchronized (entry)
177         {
178           try
179           {
180             entry.wait(10);
181           }
182           catch (InterruptedException e)
183           {
184             e.printStackTrace();
185             return false;
186           }
187         }
188       }
189     }
190     return true;
191   }
192 }