View Javadoc

1   package org.apache.helix;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.Iterator;
24  import java.util.concurrent.ConcurrentLinkedQueue;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.I0Itec.zkclient.exception.ZkNoNodeException;
29  import org.apache.log4j.Logger;
30  
31  // TODO: move to mananger.zk
32  public class GroupCommit
33  { 
34    private static Logger  LOG = Logger.getLogger(GroupCommit.class);
35    private static class Queue
36    {
37      final AtomicReference<Thread>      _running = new AtomicReference<Thread>();
38      final ConcurrentLinkedQueue<Entry> _pending = new ConcurrentLinkedQueue<Entry>();
39    }
40  
41    private static class Entry
42    {
43      final String   _key;
44      final ZNRecord _record;
45      AtomicBoolean  _sent = new AtomicBoolean(false);
46  
47      Entry(String key, ZNRecord record)
48      {
49        _key = key;
50        _record = record;
51      }
52    }
53  
54    private final Queue[]               _queues = new Queue[100];
55    
56    // potential memory leak if we add resource and remove resource
57    // TODO: move the cache logic to data accessor
58  //  private final Map<String, ZNRecord> _cache  = new ConcurrentHashMap<String, ZNRecord>();
59  
60    
61    public GroupCommit()
62    {
63      // Don't use Arrays.fill();
64      for (int i = 0; i < _queues.length; ++i)
65      {
66        _queues[i] = new Queue();
67      }
68    }
69  
70    private Queue getQueue(String key)
71    {
72      return _queues[(key.hashCode() & Integer.MAX_VALUE) % _queues.length];
73    }
74  
75    public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key, ZNRecord record)
76    {
77      Queue queue = getQueue(key);
78      Entry entry = new Entry(key, record);
79  
80      queue._pending.add(entry);
81  
82      while (!entry._sent.get())
83      {
84        if (queue._running.compareAndSet(null, Thread.currentThread()))
85        {
86          ArrayList<Entry> processed = new ArrayList<Entry>();
87          try
88          {
89            if (queue._pending.peek() == null)
90              return true;
91  
92            // remove from queue
93            Entry first = queue._pending.poll();
94            processed.add(first);
95  
96            String mergedKey = first._key;
97  //          ZNRecord merged = _cache.get(mergedKey);
98            ZNRecord merged = null;
99            
100           try
101           {
102             // accessor will fallback to zk if not found in cache
103             merged = accessor.get(mergedKey, null, options);
104           }
105           catch (ZkNoNodeException e)
106           {
107             // OK.
108           }
109           
110           /**
111            * If the local cache does not contain a value, need to check if there is a 
112            * value in ZK; use it as initial value if exists
113            */
114           if (merged == null)
115           {
116 //            ZNRecord valueOnZk = null;
117 //            try
118 //            {
119 //              valueOnZk = accessor.get(mergedKey, null, 0);
120 //            }
121 //            catch(Exception e)
122 //            {
123 //              LOG.info(e);
124 //            }
125 //            if(valueOnZk != null)
126 //            {
127 //              merged = valueOnZk;
128 //              merged.merge(first._record);
129 //            }
130 //            else // Zk path has null data. use the first record as initial record.
131             {
132               merged = new ZNRecord(first._record);
133             }
134           }
135           else
136           {
137             merged.merge(first._record);
138           }
139           Iterator<Entry> it = queue._pending.iterator();
140           while (it.hasNext())
141           {
142             Entry ent = it.next();
143             if (!ent._key.equals(mergedKey))
144               continue;
145             processed.add(ent);
146             merged.merge(ent._record);
147             // System.out.println("After merging:" + merged);
148             it.remove();
149           }
150           // System.out.println("size:"+ processed.size());
151           accessor.set(mergedKey, merged, options);
152           // accessor.set(mergedKey, merged, BaseDataAccessor.Option.PERSISTENT);
153           // _cache.put(mergedKey, merged);
154         }
155         finally
156         {
157           queue._running.set(null);
158           for (Entry e : processed)
159           {
160             synchronized (e)
161             {
162               e._sent.set(true);
163               e.notify();
164             }
165           }
166         }
167       }
168       else
169       {
170         synchronized (entry)
171         {
172           try
173           {
174             entry.wait(10);
175           }
176           catch (InterruptedException e)
177           {
178             e.printStackTrace();
179             return false;
180           }
181         }
182       }
183     }
184     return true;
185   }
186 
187 }