1 package org.apache.helix.mock.controller;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.IOException;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.TreeMap;
30
31 import org.apache.helix.ZNRecord;
32 import org.apache.helix.PropertyKey.Builder;
33 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
34 import org.apache.helix.manager.zk.ZNRecordSerializer;
35 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36 import org.apache.helix.manager.zk.ZkClient;
37 import org.apache.helix.model.ExternalView;
38 import org.apache.helix.model.Message;
39 import org.apache.helix.model.IdealState.IdealStateProperty;
40 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
41 import org.apache.helix.model.Message.MessageState;
42 import org.apache.helix.model.Message.MessageType;
43 import org.apache.helix.util.HelixUtil;
44 import org.codehaus.jackson.JsonGenerationException;
45 import org.codehaus.jackson.map.JsonMappingException;
46 import org.codehaus.jackson.map.ObjectMapper;
47
48
49 public class MockController
50 {
51 private final ZkClient client;
52 private final String srcName;
53 private final String clusterName;
54
55 public MockController(String src, String zkServer, String cluster)
56 {
57 srcName = src;
58 clusterName = cluster;
59 client = new ZkClient(zkServer);
60 client.setZkSerializer(new ZNRecordSerializer());
61 }
62
63 void sendMessage(String msgId, String instanceName, String fromState,
64 String toState, String partitionKey, int partitionId)
65 throws InterruptedException, JsonGenerationException,
66 JsonMappingException, IOException
67 {
68 Message message = new Message(MessageType.STATE_TRANSITION, msgId);
69 message.setMsgId(msgId);
70 message.setSrcName(srcName);
71 message.setTgtName(instanceName);
72 message.setMsgState(MessageState.NEW);
73 message.setFromState(fromState);
74 message.setToState(toState);
75
76 message.setPartitionName(partitionKey);
77
78 String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
79 + message.getId();
80 ObjectMapper mapper = new ObjectMapper();
81 StringWriter sw = new StringWriter();
82 mapper.writeValueUsingView(sw, message, Message.class);
83 System.out.println(sw.toString());
84 client.delete(path);
85
86 Thread.sleep(10000);
87 ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
88 instanceName));
89 message.setTgtSessionId(record.getSimpleField(
90 LiveInstanceProperty.SESSION_ID.toString()).toString());
91 client.createPersistent(path, message);
92 }
93
94 public void createExternalView(List<String> instanceNames, int partitions,
95 int replicas, String dbName, long randomSeed)
96 {
97 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
98 Builder keyBuilder = accessor.keyBuilder();
99
100 ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
101 replicas, dbName, randomSeed));
102
103 accessor.setProperty(keyBuilder.externalView(dbName), externalView);
104 }
105
106 public ZNRecord computeRoutingTable(List<String> instanceNames,
107 int partitions, int replicas, String dbName, long randomSeed)
108 {
109 assert (instanceNames.size() > replicas);
110 Collections.sort(instanceNames);
111
112 ZNRecord result = new ZNRecord(dbName);
113
114 Map<String, Object> externalView = new TreeMap<String, Object>();
115
116 List<Integer> partitionList = new ArrayList<Integer>(partitions);
117 for (int i = 0; i < partitions; i++)
118 {
119 partitionList.add(new Integer(i));
120 }
121 Random rand = new Random(randomSeed);
122
123 Collections.shuffle(partitionList, rand);
124
125 for (int i = 0; i < partitionList.size(); i++)
126 {
127 int partitionId = partitionList.get(i);
128 Map<String, String> partitionAssignment = new TreeMap<String, String>();
129 int masterNode = i % instanceNames.size();
130
131 partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
132
133
134
135 for (int j = 1; j <= replicas; j++)
136 {
137 partitionAssignment
138 .put(instanceNames.get((masterNode + j) % instanceNames.size()),
139 "SLAVE");
140 }
141 String partitionName = dbName + ".partition-" + partitionId;
142 result.setMapField(partitionName, partitionAssignment);
143 }
144 result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
145 return result;
146 }
147 }