View Javadoc

1   package org.apache.helix.controller.restlet;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.StringWriter;
23  import java.util.Map;
24  import java.util.Timer;
25  import java.util.TimerTask;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.apache.helix.ZNRecord;
35  import org.apache.log4j.Logger;
36  import org.codehaus.jackson.map.ObjectMapper;
37  import org.restlet.Client;
38  import org.restlet.data.MediaType;
39  import org.restlet.data.Method;
40  import org.restlet.data.Protocol;
41  import org.restlet.data.Reference;
42  import org.restlet.data.Request;
43  import org.restlet.data.Response;
44  import org.restlet.data.Status;
45  
46  public class ZkPropertyTransferClient
47  {
48    private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
49    public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
50    public static int SEND_PERIOD = 10 * 1000;
51    
52    public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
53    
54    int _maxConcurrentTasks;
55    ExecutorService _executorService;
56    Client[] _clients;
57    AtomicInteger _requestCount = new AtomicInteger(0);
58    
59    // ZNRecord update buffer: key is the zkPath, value is the ZNRecordUpdate
60    AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
61      = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
62    Timer _timer;
63    volatile String _webServiceUrl = "";
64    
65    public ZkPropertyTransferClient(int maxConcurrentTasks)
66    {
67      _maxConcurrentTasks = maxConcurrentTasks;
68      _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
69      _clients = new Client[_maxConcurrentTasks];
70      for(int i = 0; i< _clients.length; i++)
71      {
72        _clients[i] = new Client(Protocol.HTTP);
73      }
74      _timer = new Timer(true);
75      _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
76      _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
77    }
78    
79    class SendZNRecordTimerTask extends TimerTask
80    {
81      @Override
82      public void run()
83      { 
84        sendUpdateBatch();
85      }
86    }
87    
88    public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
89    {
90      try
91      {
92        LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
93        _webServiceUrl = webserviceUrl;
94        update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
95        synchronized(_dataBufferRef)
96        {
97          if(_dataBufferRef.get().containsKey(update._path))
98          {
99            ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
100           oldVal = update.getZNRecordUpdater().update(oldVal);
101           _dataBufferRef.get().get(update.getPath())._record = oldVal;
102         }
103         else
104         {
105           _dataBufferRef.get().put(update.getPath(), update);
106         }
107       }
108     }
109     catch(Exception e)
110     {
111       LOG.error("", e);
112     }
113   }
114   
115   void sendUpdateBatch()
116   {
117     LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
118     Map<String, ZNRecordUpdate> updateCache  = null;
119     
120     synchronized(_dataBufferRef)
121     {
122       updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
123     }
124     
125     if(updateCache != null && updateCache.size() > 0)
126     {
127       ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
128       _requestCount.incrementAndGet();
129       _executorService.submit(task);
130       LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
131     }
132   }
133   
134   public void shutdown()
135   {
136     LOG.info("Shutting down ZkPropertyTransferClient");
137     _executorService.shutdown();
138     _timer.cancel();
139     for(Client client: _clients)
140     {
141       try
142       {
143         client.stop();
144       }
145       catch (Exception e)
146       {
147         LOG.error("", e);
148       }
149     }
150   }
151   
152   class ZNRecordUpdateUploadTask implements Callable<Void>
153   {
154     Map<String, ZNRecordUpdate> _updateMap;
155     String _webServiceUrl;
156     Client _client;
157     
158     ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
159     {
160       _updateMap = update;
161       _webServiceUrl = webserviceUrl;
162       _client = client;
163     }
164     
165     @Override
166     public Void call() throws Exception
167     {
168       LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
169       long time = System.currentTimeMillis();
170       Reference resourceRef = new Reference(_webServiceUrl);
171       Request request = new Request(Method.PUT, resourceRef);
172       
173       ObjectMapper mapper = new ObjectMapper();
174       StringWriter sw = new StringWriter();
175       try
176       {
177         mapper.writeValue(sw, _updateMap);
178       }
179       catch (Exception e)
180       {
181         LOG.error("",e);
182       }
183 
184       request.setEntity(
185           ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
186       // This is a sync call. See com.noelios.restlet.http.StreamClientCall.sendRequest()
187       Response response = _client.handle(request);
188       
189       if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
190       {
191         LOG.error("Status : " + response.getStatus());
192       }
193       LOG.info("Using time : " + (System.currentTimeMillis() - time));
194       return null;
195     }
196   }
197 }