View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  
24  import org.apache.helix.HelixManager;
25  import org.apache.helix.NotificationContext;
26  import org.apache.helix.TestHelper;
27  import org.apache.helix.HelixConstants.ChangeType;
28  import org.apache.helix.NotificationContext.Type;
29  import org.apache.helix.PropertyKey.Builder;
30  import org.apache.helix.controller.HelixControllerMain;
31  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
32  import org.apache.helix.manager.zk.ZNRecordSerializer;
33  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
34  import org.apache.helix.manager.zk.ZkClient;
35  import org.apache.helix.mock.participant.MockJobIntf;
36  import org.apache.helix.mock.participant.MockParticipant;
37  import org.apache.helix.model.LiveInstance;
38  import org.apache.helix.participant.CustomCodeCallbackHandler;
39  import org.apache.helix.participant.HelixCustomCodeRunner;
40  import org.apache.helix.tools.ClusterStateVerifier;
41  import org.testng.Assert;
42  import org.testng.annotations.Test;
43  
44  
45  public class TestHelixCustomCodeRunner extends ZkIntegrationTestBase
46  {
47    private final String _clusterName = "CLUSTER_" + getShortClassName();
48    private final int _nodeNb = 5;
49    private final int _startPort = 12918;
50    private final MockCallback _callback = new MockCallback();
51  
52    class MockCallback implements CustomCodeCallbackHandler
53    {
54      boolean _isCallbackInvoked;
55  
56      @Override
57      public void onCallback(NotificationContext context)
58      {
59        HelixManager manager = context.getManager();
60        Type type = context.getType();
61        _isCallbackInvoked = true;
62  //      System.out.println(type + ": TestCallback invoked on " + manager.getInstanceName());
63      }
64  
65    }
66  
67    class MockJob implements MockJobIntf
68    {
69      @Override
70      public void doPreConnectJob(HelixManager manager)
71      {
72        try
73        {
74          // delay the start of the 1st participant
75          //  so there will be a leadership transfer from localhost_12919 to 12918
76          if (manager.getInstanceName().equals("localhost_12918"))
77          {
78            Thread.sleep(2000);
79          }
80  
81          HelixCustomCodeRunner customCodeRunner = new HelixCustomCodeRunner(manager, ZK_ADDR);
82          customCodeRunner.invoke(_callback)
83                           .on(ChangeType.LIVE_INSTANCE)
84                           .usingLeaderStandbyModel("TestParticLeader")
85                           .start();
86        } catch (Exception e)
87        {
88          // TODO Auto-generated catch block
89          e.printStackTrace();
90        }
91      }
92  
93      @Override
94      public void doPostConnectJob(HelixManager manager)
95      {
96        // TODO Auto-generated method stub
97  
98      }
99  
100   }
101 
102   @Test
103   public void testCustomCodeRunner() throws Exception
104   {
105     System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
106 
107     TestHelper.setupCluster(_clusterName,
108                             ZK_ADDR,
109                             _startPort,
110                             "localhost",  // participant name prefix
111                             "TestDB",     // resource name prefix
112                             1,  // resourceNb
113                             5,  // partitionNb
114                             _nodeNb,  // nodesNb
115                             _nodeNb,  // replica
116                             "MasterSlave",
117                             true);
118 
119     TestHelper.startController(_clusterName,
120                                "controller_0",
121                                ZK_ADDR,
122                                HelixControllerMain.STANDALONE);
123 
124     MockParticipant[] partics = new MockParticipant[5];
125     for (int i = 0; i < _nodeNb; i++)
126     {
127       String instanceName =  "localhost_" + (_startPort + i);
128 
129       partics[i] = new MockParticipant(_clusterName, instanceName, ZK_ADDR,
130                         null, new MockJob());
131       partics[i].syncStart();
132 //      new Thread(partics[i]).start();
133     }
134     boolean result = ClusterStateVerifier.verifyByPolling(
135         new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
136     Assert.assertTrue(result);
137 
138     Thread.sleep(1000);  // wait for the INIT type callback to finish
139     Assert.assertTrue(_callback._isCallbackInvoked);
140     _callback._isCallbackInvoked = false;
141 
142     // add a new live instance
143     ZkClient zkClient = new ZkClient(ZK_ADDR);
144     zkClient.setZkSerializer(new ZNRecordSerializer());
145     ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(zkClient));
146     Builder keyBuilder = accessor.keyBuilder();
147 
148     LiveInstance newLiveIns = new LiveInstance("newLiveInstance");
149     newLiveIns.setHelixVersion("0.0.0");
150     newLiveIns.setSessionId("randomSessionId");
151     accessor.setProperty(keyBuilder.liveInstance("newLiveInstance"), newLiveIns);
152 
153     Thread.sleep(1000);  // wait for the CALLBACK type callback to finish
154     Assert.assertTrue(_callback._isCallbackInvoked);
155 
156     System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
157   }
158 }