1 package org.apache.helix.controller.restlet;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.StringWriter;
23 import java.util.Map;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.helix.ZNRecord;
35 import org.apache.log4j.Logger;
36 import org.codehaus.jackson.map.ObjectMapper;
37 import org.restlet.Client;
38 import org.restlet.data.MediaType;
39 import org.restlet.data.Method;
40 import org.restlet.data.Protocol;
41 import org.restlet.data.Reference;
42 import org.restlet.data.Request;
43 import org.restlet.data.Response;
44 import org.restlet.data.Status;
45
46 public class ZkPropertyTransferClient
47 {
48 private static Logger LOG = Logger.getLogger(ZkPropertyTransferClient.class);
49 public static final int DEFAULT_MAX_CONCURRENTTASKS = 2;
50 public static int SEND_PERIOD = 10 * 1000;
51
52 public static final String USE_PROPERTYTRANSFER = "UsePropertyTransfer";
53
54 int _maxConcurrentTasks;
55 ExecutorService _executorService;
56 Client[] _clients;
57 AtomicInteger _requestCount = new AtomicInteger(0);
58
59
60 AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
61 = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
62 Timer _timer;
63 volatile String _webServiceUrl = "";
64
65 public ZkPropertyTransferClient(int maxConcurrentTasks)
66 {
67 _maxConcurrentTasks = maxConcurrentTasks;
68 _executorService = Executors.newFixedThreadPool(_maxConcurrentTasks);
69 _clients = new Client[_maxConcurrentTasks];
70 for(int i = 0; i< _clients.length; i++)
71 {
72 _clients[i] = new Client(Protocol.HTTP);
73 }
74 _timer = new Timer(true);
75 _timer.schedule(new SendZNRecordTimerTask(), SEND_PERIOD, SEND_PERIOD);
76 _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
77 }
78
79 class SendZNRecordTimerTask extends TimerTask
80 {
81 @Override
82 public void run()
83 {
84 sendUpdateBatch();
85 }
86 }
87
88 public void enqueueZNRecordUpdate(ZNRecordUpdate update, String webserviceUrl)
89 {
90 try
91 {
92 LOG.info("Enqueue update to " + update.getPath() + " opcode: " + update.getOpcode() + " to " + webserviceUrl);
93 _webServiceUrl = webserviceUrl;
94 update.getRecord().setSimpleField(USE_PROPERTYTRANSFER, "true");
95 synchronized(_dataBufferRef)
96 {
97 if(_dataBufferRef.get().containsKey(update._path))
98 {
99 ZNRecord oldVal = _dataBufferRef.get().get(update.getPath()).getRecord();
100 oldVal = update.getZNRecordUpdater().update(oldVal);
101 _dataBufferRef.get().get(update.getPath())._record = oldVal;
102 }
103 else
104 {
105 _dataBufferRef.get().put(update.getPath(), update);
106 }
107 }
108 }
109 catch(Exception e)
110 {
111 LOG.error("", e);
112 }
113 }
114
115 void sendUpdateBatch()
116 {
117 LOG.debug("Actual sending update with " + _dataBufferRef.get().size() + " updates to " + _webServiceUrl);
118 Map<String, ZNRecordUpdate> updateCache = null;
119
120 synchronized(_dataBufferRef)
121 {
122 updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
123 }
124
125 if(updateCache != null && updateCache.size() > 0)
126 {
127 ZNRecordUpdateUploadTask task = new ZNRecordUpdateUploadTask(updateCache, _webServiceUrl, _clients[_requestCount.intValue() % _maxConcurrentTasks]);
128 _requestCount.incrementAndGet();
129 _executorService.submit(task);
130 LOG.trace("Queue size :" + ((ThreadPoolExecutor)_executorService).getQueue().size());
131 }
132 }
133
134 public void shutdown()
135 {
136 LOG.info("Shutting down ZkPropertyTransferClient");
137 _executorService.shutdown();
138 _timer.cancel();
139 for(Client client: _clients)
140 {
141 try
142 {
143 client.stop();
144 }
145 catch (Exception e)
146 {
147 LOG.error("", e);
148 }
149 }
150 }
151
152 class ZNRecordUpdateUploadTask implements Callable<Void>
153 {
154 Map<String, ZNRecordUpdate> _updateMap;
155 String _webServiceUrl;
156 Client _client;
157
158 ZNRecordUpdateUploadTask(Map<String, ZNRecordUpdate> update, String webserviceUrl, Client client)
159 {
160 _updateMap = update;
161 _webServiceUrl = webserviceUrl;
162 _client = client;
163 }
164
165 @Override
166 public Void call() throws Exception
167 {
168 LOG.debug("Actual sending update with " + _updateMap.size() + " updates to " + _webServiceUrl);
169 long time = System.currentTimeMillis();
170 Reference resourceRef = new Reference(_webServiceUrl);
171 Request request = new Request(Method.PUT, resourceRef);
172
173 ObjectMapper mapper = new ObjectMapper();
174 StringWriter sw = new StringWriter();
175 try
176 {
177 mapper.writeValue(sw, _updateMap);
178 }
179 catch (Exception e)
180 {
181 LOG.error("",e);
182 }
183
184 request.setEntity(
185 ZNRecordUpdateResource.UPDATEKEY + "=" + sw, MediaType.APPLICATION_ALL);
186
187 Response response = _client.handle(request);
188
189 if(response.getStatus().getCode() != Status.SUCCESS_OK.getCode())
190 {
191 LOG.error("Status : " + response.getStatus());
192 }
193 LOG.info("Using time : " + (System.currentTimeMillis() - time));
194 return null;
195 }
196 }
197 }