View Javadoc

1   package org.apache.helix.controller.restlet;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.net.InetAddress;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Timer;
26  import java.util.TimerTask;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.atomic.AtomicReference;
29  
30  import org.I0Itec.zkclient.DataUpdater;
31  import org.apache.helix.AccessOption;
32  import org.apache.helix.BaseDataAccessor;
33  import org.apache.helix.ZNRecord;
34  import org.apache.helix.manager.zk.ZNRecordSerializer;
35  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36  import org.apache.helix.manager.zk.ZkClient;
37  import org.apache.log4j.Logger;
38  import org.restlet.Component;
39  import org.restlet.Context;
40  import org.restlet.data.Protocol;
41  
42  /**
43   * Controller side restlet server that receives ZNRecordUpdate requests from
44   * clients, and batch the ZNRecordUpdate and apply them to zookeeper. This is 
45   * to optimize the concurrency level of zookeeper access for ZNRecord updates 
46   * that does not require real-time, like message handling status updates and 
47   * healthcheck reports. 
48   * 
49   * As one server will be used by multiple helix controllers that runs on the same machine,
50   * This class is designed as a singleton. Application is responsible to call init() 
51   * and shutdown() on the getInstance().
52   * */
53  public class ZKPropertyTransferServer
54  {
55    public static final String PORT = "port";
56    public static String RESTRESOURCENAME = "ZNRecordUpdates";
57    public static final String SERVER = "ZKPropertyTransferServer";
58    
59    // Frequency period for the ZNRecords are batch written to zookeeper 
60    public static int PERIOD = 10 * 1000;
61    // If the buffered ZNRecord updates exceed the limit, do a zookeeper batch update.
62    public static int MAX_UPDATE_LIMIT = 10000;
63    private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
64    
65    int _localWebservicePort;
66    String _webserviceUrl;
67    ZkBaseDataAccessor<ZNRecord> _accessor;
68    String _zkAddress;
69    
70    AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
71      = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
72    
73    boolean _initialized = false;
74    boolean _shutdownFlag = false;
75    Component _component = null;
76    Timer _timer = null;
77    
78    /**
79     * Timertask for zookeeper batched writes
80     * */
81    class ZKPropertyTransferTask extends TimerTask
82    {
83      @Override
84      public void run()
85      {
86        try
87        {
88          sendData();
89        }
90        catch(Throwable t)
91        {
92          LOG.error("", t);
93        }
94      
95      }
96    }
97    
98    void sendData()
99    {
100     LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
101     ConcurrentHashMap<String, ZNRecordUpdate> updateCache  = null;
102     
103     synchronized(_dataBufferRef)
104     {
105       updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
106     }
107     
108     if(updateCache != null)
109     {
110       List<String> paths = new ArrayList<String>();
111       List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
112       List<ZNRecord> vals = new ArrayList<ZNRecord>();
113       // BUGBUG : what if the instance is dropped? 
114       for(ZNRecordUpdate holder : updateCache.values())
115       {
116         paths.add(holder.getPath());
117         updaters.add(holder.getZNRecordUpdater());
118         vals.add(holder.getRecord());
119       }
120       // Batch write the accumulated updates into zookeeper
121       long timeStart = System.currentTimeMillis();
122       if(paths.size() > 0)
123       {
124         _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
125       }
126       LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
127     }
128     else
129     {
130       LOG.warn("null _dataQueueRef. Should be in the beginning only");
131     }
132   }
133   
134   static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
135   
136   private ZKPropertyTransferServer()
137   {
138     _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
139   }
140   
141   public static ZKPropertyTransferServer getInstance()
142   {
143     return _instance;
144   }
145   
146   public boolean isInitialized()
147   {
148     return _initialized;
149   }
150   
151   public void init(int localWebservicePort, String zkAddress)
152   {
153     if(!_initialized && !_shutdownFlag)
154     {
155       LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
156       _localWebservicePort = localWebservicePort;
157       ZkClient zkClient = new ZkClient(zkAddress);
158       zkClient.setZkSerializer(new ZNRecordSerializer());
159       _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
160       _zkAddress = zkAddress;
161       startServer();
162     }
163     else
164     {
165       LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
166     }
167   }
168   
169   public String getWebserviceUrl()
170   {
171     if(!_initialized || _shutdownFlag)
172     {
173       LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
174       return null;
175     }
176     return _webserviceUrl;
177   }
178   
179   /** Add an ZNRecordUpdate to the change queue. 
180    *  Called by the webservice front-end.
181    *
182    */
183   void enqueueData(ZNRecordUpdate e)
184   {
185     if(!_initialized || _shutdownFlag)
186     {
187       LOG.error("zkDataTransferServer inited:" + _initialized 
188           + " shutdownFlag:"+_shutdownFlag+" , return");
189       return;
190     }
191     // Do local merge if receive multiple update on the same path
192     synchronized(_dataBufferRef)
193     {
194       e.getRecord().setSimpleField(SERVER, _webserviceUrl);
195       if(_dataBufferRef.get().containsKey(e.getPath()))
196       {
197         ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
198         oldVal = e.getZNRecordUpdater().update(oldVal);
199         _dataBufferRef.get().get(e.getPath())._record = oldVal;
200       }
201       else
202       {
203         _dataBufferRef.get().put(e.getPath(), e);
204       }
205     }
206     if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
207     {
208       sendData();
209     }
210   }
211   
212   void startServer()
213   {
214     LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
215     
216     _component = new Component();
217     
218     _component.getServers().add(Protocol.HTTP, _localWebservicePort);
219     Context applicationContext = _component.getContext().createChildContext();
220     applicationContext.getAttributes().put(SERVER, this);
221     applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
222     ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
223         applicationContext);
224     // Attach the application to the component and start it
225     _component.getDefaultHost().attach(application);
226     _timer = new Timer(true);
227     _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
228     
229     try
230     {
231       _webserviceUrl 
232         = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort 
233               + "/" + RESTRESOURCENAME;
234       _component.start();
235       _initialized = true;
236     }
237     catch (Exception e)
238     {
239       LOG.error("", e);
240     }
241     LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
242   }
243   
244   public void shutdown()
245   {
246     if(_shutdownFlag)
247     {
248       LOG.error("ZKPropertyTransferServer already has been shutdown...");
249       return;
250     }
251     LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
252     if(_timer != null)
253     {
254       _timer.cancel();
255     }
256     if(_component != null)
257     {
258       try
259       {
260         _component.stop();
261       }
262       catch (Exception e)
263       {
264         LOG.error("", e);
265       }
266     }
267     _shutdownFlag = true;
268   }
269 
270   public void reset()
271   {
272     if(_shutdownFlag == true)
273     {
274       _shutdownFlag = false;
275       _initialized = false;
276       _component = null;
277       _timer = null;
278       _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
279     }
280   }
281 }