View Javadoc

1   package org.apache.helix.mock.controller;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.StringWriter;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Random;
29  import java.util.TreeMap;
30  
31  import org.apache.helix.ZNRecord;
32  import org.apache.helix.PropertyKey.Builder;
33  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
34  import org.apache.helix.manager.zk.ZNRecordSerializer;
35  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
36  import org.apache.helix.manager.zk.ZkClient;
37  import org.apache.helix.model.ExternalView;
38  import org.apache.helix.model.Message;
39  import org.apache.helix.model.IdealState.IdealStateProperty;
40  import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
41  import org.apache.helix.model.Message.MessageState;
42  import org.apache.helix.model.Message.MessageType;
43  import org.apache.helix.util.HelixUtil;
44  import org.codehaus.jackson.JsonGenerationException;
45  import org.codehaus.jackson.map.JsonMappingException;
46  import org.codehaus.jackson.map.ObjectMapper;
47  
48  
49  public class MockController
50  {
51    private final ZkClient client;
52    private final String srcName;
53    private final String clusterName;
54  
55    public MockController(String src, String zkServer, String cluster)
56    {
57      srcName = src;
58      clusterName = cluster;
59      client = new ZkClient(zkServer);
60      client.setZkSerializer(new ZNRecordSerializer());
61    }
62  
63    void sendMessage(String msgId, String instanceName, String fromState,
64        String toState, String partitionKey, int partitionId)
65        throws InterruptedException, JsonGenerationException,
66        JsonMappingException, IOException
67    {
68      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
69      message.setMsgId(msgId);
70      message.setSrcName(srcName);
71      message.setTgtName(instanceName);
72      message.setMsgState(MessageState.NEW);
73      message.setFromState(fromState);
74      message.setToState(toState);
75      // message.setPartitionId(partitionId);
76      message.setPartitionName(partitionKey);
77  
78      String path = HelixUtil.getMessagePath(clusterName, instanceName) + "/"
79          + message.getId();
80      ObjectMapper mapper = new ObjectMapper();
81      StringWriter sw = new StringWriter();
82      mapper.writeValueUsingView(sw, message, Message.class);
83      System.out.println(sw.toString());
84      client.delete(path);
85  
86      Thread.sleep(10000);
87      ZNRecord record = client.readData(HelixUtil.getLiveInstancePath(clusterName,
88          instanceName));
89      message.setTgtSessionId(record.getSimpleField(
90          LiveInstanceProperty.SESSION_ID.toString()).toString());
91      client.createPersistent(path, message);
92    }
93  
94    public void createExternalView(List<String> instanceNames, int partitions,
95        int replicas, String dbName, long randomSeed)
96    {
97      ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(client));
98      Builder keyBuilder = accessor.keyBuilder();
99  
100     ExternalView externalView = new ExternalView(computeRoutingTable(instanceNames, partitions,
101                                                                      replicas, dbName, randomSeed));
102 
103     accessor.setProperty(keyBuilder.externalView(dbName), externalView);
104   }
105 
106   public ZNRecord computeRoutingTable(List<String> instanceNames,
107       int partitions, int replicas, String dbName, long randomSeed)
108   {
109     assert (instanceNames.size() > replicas);
110     Collections.sort(instanceNames);
111 
112     ZNRecord result = new ZNRecord(dbName);
113 
114     Map<String, Object> externalView = new TreeMap<String, Object>();
115 
116     List<Integer> partitionList = new ArrayList<Integer>(partitions);
117     for (int i = 0; i < partitions; i++)
118     {
119       partitionList.add(new Integer(i));
120     }
121     Random rand = new Random(randomSeed);
122     // Shuffle the partition list
123     Collections.shuffle(partitionList, rand);
124 
125     for (int i = 0; i < partitionList.size(); i++)
126     {
127       int partitionId = partitionList.get(i);
128       Map<String, String> partitionAssignment = new TreeMap<String, String>();
129       int masterNode = i % instanceNames.size();
130       // the first in the list is the node that contains the master
131       partitionAssignment.put(instanceNames.get(masterNode), "MASTER");
132 
133       // for the jth replica, we put it on (masterNode + j) % nodes-th
134       // node
135       for (int j = 1; j <= replicas; j++)
136       {
137         partitionAssignment
138             .put(instanceNames.get((masterNode + j) % instanceNames.size()),
139                 "SLAVE");
140       }
141       String partitionName = dbName + ".partition-" + partitionId;
142       result.setMapField(partitionName, partitionAssignment);
143     }
144     result.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(), "" + partitions);
145     return result;
146   }
147 }