View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.Date;
23  import java.util.concurrent.atomic.AtomicReference;
24  
25  import org.apache.helix.NotificationContext;
26  import org.apache.helix.TestHelper;
27  import org.apache.helix.controller.HelixControllerMain;
28  import org.apache.helix.mock.participant.MockParticipant;
29  import org.apache.helix.mock.participant.MockTransition;
30  import org.apache.helix.model.Message;
31  import org.apache.helix.tools.ClusterStateVerifier;
32  import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
33  import org.testng.Assert;
34  import org.testng.annotations.Test;
35  
36  
37  public class TestRestartParticipant extends ZkIntegrationTestBase
38  {
39    public class KillOtherTransition extends MockTransition
40    {
41      final AtomicReference<MockParticipant> _other;
42  
43      public KillOtherTransition(MockParticipant other)
44      {
45        _other = new AtomicReference<MockParticipant>(other);
46      }
47  
48      @Override
49      public void doTransition(Message message, NotificationContext context)
50      {
51        MockParticipant other = _other.getAndSet(null);
52        if (other != null)
53        {
54          System.err.println("Kill " + other.getInstanceName()
55              + ". Interrupted exceptions are IGNORABLE");
56          other.syncStop();
57        }
58      }
59    }
60  
61    @Test()
62    public void testRestartParticipant() throws Exception
63    {
64      // Logger.getRootLogger().setLevel(Level.INFO);
65      System.out.println("START testRestartParticipant at "
66          + new Date(System.currentTimeMillis()));
67  
68      String clusterName = getShortClassName();
69      MockParticipant[] participants = new MockParticipant[5];
70  
71      TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
72                              "localhost", // participant name prefix
73                              "TestDB", // resource name prefix
74                              1, // resources
75                              10, // partitions per resource
76                              5, // number of nodes
77                              3, // replicas
78                              "MasterSlave",
79                              true); // do rebalance
80  
81      TestHelper.startController(clusterName,
82                                 "controller_0",
83                                 ZK_ADDR,
84                                 HelixControllerMain.STANDALONE);
85      // start participants
86      for (int i = 0; i < 5; i++)
87      {
88        String instanceName = "localhost_" + (12918 + i);
89  
90        if (i == 4)
91        {
92          participants[i] =
93              new MockParticipant(clusterName,
94                                  instanceName,
95                                  ZK_ADDR,
96                                  new KillOtherTransition(participants[0]));
97        }
98        else
99        {
100         participants[i] =
101             new MockParticipant(clusterName,
102                                 instanceName,
103                                 ZK_ADDR,
104                                 null);
105 //        Thread.sleep(100);
106       }
107 
108       participants[i].syncStart();
109     }
110 
111     boolean result =
112         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
113                                                                                  clusterName));
114     Assert.assertTrue(result);
115 
116     // restart
117     Thread.sleep(500);
118     MockParticipant participant =
119         new MockParticipant(participants[0].getClusterName(),
120                             participants[0].getInstanceName(),
121                             ZK_ADDR,
122                             null);
123     System.err.println("Restart " + participant.getInstanceName());
124     participant.syncStart();
125     result =
126         ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
127                                                                                  clusterName));
128     Assert.assertTrue(result);
129 
130     System.out.println("START testRestartParticipant at "
131         + new Date(System.currentTimeMillis()));
132 
133   }
134 }