1 package org.apache.helix.filestore;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.I0Itec.zkclient.DataUpdater;
23 import org.apache.log4j.Logger;
24 import org.apache.zookeeper.data.Stat;
25
26 import org.apache.helix.AccessOption;
27 import org.apache.helix.HelixManager;
28 import org.apache.helix.NotificationContext;
29 import org.apache.helix.ZNRecord;
30 import org.apache.helix.model.InstanceConfig;
31 import org.apache.helix.model.Message;
32 import org.apache.helix.participant.statemachine.StateModel;
33 import org.apache.helix.participant.statemachine.StateModelInfo;
34 import org.apache.helix.participant.statemachine.Transition;
35 import org.apache.helix.store.zk.ZkHelixPropertyStore;
36
37 @StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "MASTER",
38 "SLAVE" })
39 public class FileStoreStateModel extends StateModel
40 {
41 private final class HighWaterMarkUpdater implements DataUpdater<ZNRecord>
42 {
43 private final Message message;
44 private final ChangeRecord lastRecordProcessed;
45
46 private HighWaterMarkUpdater(Message message,
47 ChangeRecord lastRecordProcessed)
48 {
49 this.message = message;
50 this.lastRecordProcessed = lastRecordProcessed;
51 }
52
53 @Override
54 public ZNRecord update(ZNRecord currentData)
55 {
56 ZNRecord newRec = new ZNRecord(message.getResourceName());
57
58 if (currentData != null)
59 {
60 int currentGen = convertToInt(newRec.getSimpleField("currentGen"), 0);
61 int currentGenStartSeq = convertToInt(
62 newRec.getSimpleField("currentGenStartSeq"), 0);
63 int prevGen = convertToInt(newRec.getSimpleField("prevGen"), 0);
64 int prevGenEndSeq = convertToInt(
65 newRec.getSimpleField("prevGenEndSeq"), 0);
66 newRec.setSimpleField("currentGen", Integer.toString(currentGen + 1));
67 newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
68 if (currentGen > 0)
69 {
70 newRec.setSimpleField("prevGen", Integer.toString(currentGen));
71 int localEndSeq = 1;
72 if (lastRecordProcessed != null)
73 {
74 localEndSeq = (int) lastRecordProcessed.txid;
75 }
76 newRec.setSimpleField("prevGenEndSeq", "" + localEndSeq);
77 }
78 newRec.merge(currentData);
79 } else
80 {
81 newRec.setSimpleField("currentGen", Integer.toString(1));
82 newRec.setSimpleField("currentGenStartSeq", Integer.toString(1));
83 }
84 return newRec;
85
86 }
87
88 private int convertToInt(String number, int defaultValue)
89 {
90 try
91 {
92 if (number != null)
93 {
94 return Integer.parseInt(number);
95 }
96 } catch (Exception e)
97 {
98
99 }
100 return defaultValue;
101 }
102 }
103
104 private static Logger LOG = Logger.getLogger(FileStoreStateModel.class);
105
106 private final String _serverId;
107 private final String _partition;
108
109 private Replicator replicator;
110
111 private ChangeLogGenerator generator;
112
113 private FileSystemWatchService service;
114
115 private InstanceConfig instanceConfig;
116
117 public FileStoreStateModel(HelixManager manager, String resource,
118 String partition)
119 {
120 String clusterName = manager.getClusterName();
121 String instanceName = manager.getInstanceName();
122 instanceConfig = manager.getClusterManagmentTool().getInstanceConfig(
123 clusterName, instanceName);
124 replicator = new Replicator(instanceConfig, resource, partition);
125 try
126 {
127 manager.addExternalViewChangeListener(replicator);
128 } catch (Exception e)
129 {
130 e.printStackTrace();
131 }
132 _partition = partition;
133 _serverId = instanceName;
134 }
135
136
137
138
139
140
141
142
143
144 @Transition(from = "OFFLINE", to = "SLAVE")
145 public void onBecomeSlaveFromOffline(Message message,
146 NotificationContext context) throws Exception
147 {
148 System.out.println(_serverId + " transitioning from " + message.getFromState()
149 + " to " + message.getToState() + " for " + _partition);
150
151 replicator.start();
152 System.out.println(_serverId + " transitioned from " + message.getFromState()
153 + " to " + message.getToState() + " for " + _partition);
154 }
155
156
157
158
159
160
161
162
163
164 @Transition(from = "SLAVE", to = "MASTER")
165 public void onBecomeMasterFromSlave(final Message message,
166 NotificationContext context) throws Exception
167 {
168 replicator.stop();
169 System.out.println(_serverId + " transitioning from " + message.getFromState()
170 + " to " + message.getToState() + " for " + _partition);
171 ZkHelixPropertyStore<ZNRecord> helixPropertyStore = context.getManager()
172 .getHelixPropertyStore();
173 String checkpointDirPath = instanceConfig.getRecord().getSimpleField(
174 "check_point_dir");
175 CheckpointFile checkpointFile = new CheckpointFile(checkpointDirPath);
176 final ChangeRecord lastRecordProcessed = checkpointFile
177 .findLastRecordProcessed();
178 DataUpdater<ZNRecord> updater = new HighWaterMarkUpdater(message,
179 lastRecordProcessed);
180 helixPropertyStore.update(
181 "TRANSACTION_ID_METADATA" + "/" + message.getResourceName(), updater,
182 AccessOption.PERSISTENT);
183 Stat stat = new Stat();
184 ;
185 ZNRecord znRecord = helixPropertyStore.get("TRANSACTION_ID_METADATA" + "/"
186 + message.getResourceName(), stat, AccessOption.PERSISTENT);
187 int startGen = Integer.parseInt(znRecord.getSimpleField("currentGen"));
188 int startSeq = Integer.parseInt(znRecord
189 .getSimpleField("currentGenStartSeq"));
190 String fileStoreDir = instanceConfig.getRecord().getSimpleField("file_store_dir");
191 String changeLogDir = instanceConfig.getRecord().getSimpleField("change_log_dir");
192
193 generator = new ChangeLogGenerator(changeLogDir, startGen, startSeq);
194
195 long now = System.currentTimeMillis();
196 service = new FileSystemWatchService(fileStoreDir, now, generator);
197 service.start();
198 System.out.println(_serverId + " transitioned from " + message.getFromState()
199 + " to " + message.getToState() + " for " + _partition);
200 }
201
202
203
204
205
206
207
208
209
210 @Transition(from = "MASTER", to = "SLAVE")
211 public void onBecomeSlaveFromMaster(Message message,
212 NotificationContext context) throws Exception
213 {
214 service.stop();
215 LOG.info(_serverId + " transitioning from " + message.getFromState()
216 + " to " + message.getToState() + " for " + _partition);
217 replicator.start();
218 }
219
220 @Transition(from = "SLAVE", to = "OFFLINE")
221 public void onBecomeOfflineFromSlave(Message message,
222 NotificationContext context)
223 {
224 replicator.stop();
225 LOG.info(_serverId + " transitioning from " + message.getFromState()
226 + " to " + message.getToState() + " for " + _partition);
227 }
228
229 public void onBecomeDroppedFromOffline(Message message,
230 NotificationContext context)
231 {
232 LOG.info(_serverId + " Dropping partition " + _partition);
233 }
234
235 @Override
236 public void reset()
237 {
238 LOG.warn("Default reset() invoked");
239 }
240 }