1 package org.apache.helix.controller.restlet;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.I0Itec.zkclient.DataUpdater;
31 import org.apache.helix.AccessOption;
32 import org.apache.helix.BaseDataAccessor;
33 import org.apache.helix.ZNRecord;
34 import org.apache.helix.manager.zk.ZNRecordSerializer;
35 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36 import org.apache.helix.manager.zk.ZkClient;
37 import org.apache.log4j.Logger;
38 import org.restlet.Component;
39 import org.restlet.Context;
40 import org.restlet.data.Protocol;
41
42
43
44
45
46
47
48
49
50
51
52
53 public class ZKPropertyTransferServer
54 {
55 public static final String PORT = "port";
56 public static String RESTRESOURCENAME = "ZNRecordUpdates";
57 public static final String SERVER = "ZKPropertyTransferServer";
58
59
60 public static int PERIOD = 10 * 1000;
61
62 public static int MAX_UPDATE_LIMIT = 10000;
63 private static Logger LOG = Logger.getLogger(ZKPropertyTransferServer.class);
64
65 int _localWebservicePort;
66 String _webserviceUrl;
67 ZkBaseDataAccessor<ZNRecord> _accessor;
68 String _zkAddress;
69
70 AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>> _dataBufferRef
71 = new AtomicReference<ConcurrentHashMap<String, ZNRecordUpdate>>();
72
73 boolean _initialized = false;
74 boolean _shutdownFlag = false;
75 Component _component = null;
76 Timer _timer = null;
77
78
79
80
81 class ZKPropertyTransferTask extends TimerTask
82 {
83 @Override
84 public void run()
85 {
86 try
87 {
88 sendData();
89 }
90 catch(Throwable t)
91 {
92 LOG.error("", t);
93 }
94
95 }
96 }
97
98 void sendData()
99 {
100 LOG.info("ZKPropertyTransferServer transfering data to zookeeper");
101 ConcurrentHashMap<String, ZNRecordUpdate> updateCache = null;
102
103 synchronized(_dataBufferRef)
104 {
105 updateCache = _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
106 }
107
108 if(updateCache != null)
109 {
110 List<String> paths = new ArrayList<String>();
111 List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
112 List<ZNRecord> vals = new ArrayList<ZNRecord>();
113
114 for(ZNRecordUpdate holder : updateCache.values())
115 {
116 paths.add(holder.getPath());
117 updaters.add(holder.getZNRecordUpdater());
118 vals.add(holder.getRecord());
119 }
120
121 long timeStart = System.currentTimeMillis();
122 if(paths.size() > 0)
123 {
124 _accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT);
125 }
126 LOG.info("ZKPropertyTransferServer updated " + vals.size() + " records in " + (System.currentTimeMillis() - timeStart) + " ms");
127 }
128 else
129 {
130 LOG.warn("null _dataQueueRef. Should be in the beginning only");
131 }
132 }
133
134 static ZKPropertyTransferServer _instance = new ZKPropertyTransferServer();
135
136 private ZKPropertyTransferServer()
137 {
138 _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
139 }
140
141 public static ZKPropertyTransferServer getInstance()
142 {
143 return _instance;
144 }
145
146 public boolean isInitialized()
147 {
148 return _initialized;
149 }
150
151 public void init(int localWebservicePort, String zkAddress)
152 {
153 if(!_initialized && !_shutdownFlag)
154 {
155 LOG.error("Initializing with port " + localWebservicePort + " zkAddress: " + zkAddress);
156 _localWebservicePort = localWebservicePort;
157 ZkClient zkClient = new ZkClient(zkAddress);
158 zkClient.setZkSerializer(new ZNRecordSerializer());
159 _accessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
160 _zkAddress = zkAddress;
161 startServer();
162 }
163 else
164 {
165 LOG.error("Already initialized with port " + _localWebservicePort + " shutdownFlag: " + _shutdownFlag);
166 }
167 }
168
169 public String getWebserviceUrl()
170 {
171 if(!_initialized || _shutdownFlag)
172 {
173 LOG.debug("inited:" + _initialized + " shutdownFlag:"+_shutdownFlag+" , return");
174 return null;
175 }
176 return _webserviceUrl;
177 }
178
179
180
181
182
183 void enqueueData(ZNRecordUpdate e)
184 {
185 if(!_initialized || _shutdownFlag)
186 {
187 LOG.error("zkDataTransferServer inited:" + _initialized
188 + " shutdownFlag:"+_shutdownFlag+" , return");
189 return;
190 }
191
192 synchronized(_dataBufferRef)
193 {
194 e.getRecord().setSimpleField(SERVER, _webserviceUrl);
195 if(_dataBufferRef.get().containsKey(e.getPath()))
196 {
197 ZNRecord oldVal = _dataBufferRef.get().get(e.getPath()).getRecord();
198 oldVal = e.getZNRecordUpdater().update(oldVal);
199 _dataBufferRef.get().get(e.getPath())._record = oldVal;
200 }
201 else
202 {
203 _dataBufferRef.get().put(e.getPath(), e);
204 }
205 }
206 if(_dataBufferRef.get().size() > MAX_UPDATE_LIMIT)
207 {
208 sendData();
209 }
210 }
211
212 void startServer()
213 {
214 LOG.info("zkDataTransferServer starting on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
215
216 _component = new Component();
217
218 _component.getServers().add(Protocol.HTTP, _localWebservicePort);
219 Context applicationContext = _component.getContext().createChildContext();
220 applicationContext.getAttributes().put(SERVER, this);
221 applicationContext.getAttributes().put(PORT, "" + _localWebservicePort);
222 ZkPropertyTransferApplication application = new ZkPropertyTransferApplication(
223 applicationContext);
224
225 _component.getDefaultHost().attach(application);
226 _timer = new Timer(true);
227 _timer.schedule(new ZKPropertyTransferTask(), PERIOD , PERIOD);
228
229 try
230 {
231 _webserviceUrl
232 = "http://" + InetAddress.getLocalHost().getCanonicalHostName() + ":" + _localWebservicePort
233 + "/" + RESTRESOURCENAME;
234 _component.start();
235 _initialized = true;
236 }
237 catch (Exception e)
238 {
239 LOG.error("", e);
240 }
241 LOG.info("zkDataTransferServer started on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
242 }
243
244 public void shutdown()
245 {
246 if(_shutdownFlag)
247 {
248 LOG.error("ZKPropertyTransferServer already has been shutdown...");
249 return;
250 }
251 LOG.info("zkDataTransferServer shuting down on Port " + _localWebservicePort + " zkAddress " + _zkAddress);
252 if(_timer != null)
253 {
254 _timer.cancel();
255 }
256 if(_component != null)
257 {
258 try
259 {
260 _component.stop();
261 }
262 catch (Exception e)
263 {
264 LOG.error("", e);
265 }
266 }
267 _shutdownFlag = true;
268 }
269
270 public void reset()
271 {
272 if(_shutdownFlag == true)
273 {
274 _shutdownFlag = false;
275 _initialized = false;
276 _component = null;
277 _timer = null;
278 _dataBufferRef.getAndSet(new ConcurrentHashMap<String, ZNRecordUpdate>());
279 }
280 }
281 }