1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.Date;
23 import java.util.concurrent.atomic.AtomicReference;
24
25 import org.apache.helix.NotificationContext;
26 import org.apache.helix.TestHelper;
27 import org.apache.helix.controller.HelixControllerMain;
28 import org.apache.helix.mock.participant.MockParticipant;
29 import org.apache.helix.mock.participant.MockTransition;
30 import org.apache.helix.model.Message;
31 import org.apache.helix.tools.ClusterStateVerifier;
32 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
33 import org.testng.Assert;
34 import org.testng.annotations.Test;
35
36
37 public class TestRestartParticipant extends ZkIntegrationTestBase
38 {
39 public class KillOtherTransition extends MockTransition
40 {
41 final AtomicReference<MockParticipant> _other;
42
43 public KillOtherTransition(MockParticipant other)
44 {
45 _other = new AtomicReference<MockParticipant>(other);
46 }
47
48 @Override
49 public void doTransition(Message message, NotificationContext context)
50 {
51 MockParticipant other = _other.getAndSet(null);
52 if (other != null)
53 {
54 System.err.println("Kill " + other.getInstanceName()
55 + ". Interrupted exceptions are IGNORABLE");
56 other.syncStop();
57 }
58 }
59 }
60
61 @Test()
62 public void testRestartParticipant() throws Exception
63 {
64
65 System.out.println("START testRestartParticipant at "
66 + new Date(System.currentTimeMillis()));
67
68 String clusterName = getShortClassName();
69 MockParticipant[] participants = new MockParticipant[5];
70
71 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
72 "localhost",
73 "TestDB",
74 1,
75 10,
76 5,
77 3,
78 "MasterSlave",
79 true);
80
81 TestHelper.startController(clusterName,
82 "controller_0",
83 ZK_ADDR,
84 HelixControllerMain.STANDALONE);
85
86 for (int i = 0; i < 5; i++)
87 {
88 String instanceName = "localhost_" + (12918 + i);
89
90 if (i == 4)
91 {
92 participants[i] =
93 new MockParticipant(clusterName,
94 instanceName,
95 ZK_ADDR,
96 new KillOtherTransition(participants[0]));
97 }
98 else
99 {
100 participants[i] =
101 new MockParticipant(clusterName,
102 instanceName,
103 ZK_ADDR,
104 null);
105
106 }
107
108 participants[i].syncStart();
109 }
110
111 boolean result =
112 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
113 clusterName));
114 Assert.assertTrue(result);
115
116
117 Thread.sleep(500);
118 MockParticipant participant =
119 new MockParticipant(participants[0].getClusterName(),
120 participants[0].getInstanceName(),
121 ZK_ADDR,
122 null);
123 System.err.println("Restart " + participant.getInstanceName());
124 participant.syncStart();
125 result =
126 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
127 clusterName));
128 Assert.assertTrue(result);
129
130 System.out.println("START testRestartParticipant at "
131 + new Date(System.currentTimeMillis()));
132
133 }
134 }