1 package org.apache.helix.integration;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25
26 import org.apache.helix.TestHelper;
27 import org.apache.helix.ZNRecord;
28 import org.apache.helix.PropertyKey.Builder;
29 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
30 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
31 import org.apache.helix.mock.controller.ClusterController;
32 import org.apache.helix.mock.participant.MockParticipant;
33 import org.apache.helix.mock.participant.MockMSModelFactory;
34 import org.apache.helix.model.CurrentState;
35 import org.apache.helix.model.ExternalView;
36 import org.apache.helix.model.IdealState;
37 import org.apache.helix.model.Message;
38 import org.apache.helix.tools.ClusterSetup;
39 import org.apache.helix.tools.ClusterStateVerifier;
40 import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
41 import org.testng.Assert;
42 import org.testng.annotations.Test;
43
44
45 public class TestAddStateModelFactoryAfterConnect extends ZkIntegrationTestBase
46 {
47 @Test
48 public void testBasic() throws Exception
49 {
50
51 String className = TestHelper.getTestClassName();
52 String methodName = TestHelper.getTestMethodName();
53 String clusterName = className + "_" + methodName;
54 final int n = 5;
55
56 System.out.println("START " + clusterName + " at "
57 + new Date(System.currentTimeMillis()));
58
59 MockParticipant[] participants = new MockParticipant[n];
60
61 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918,
62 "localhost",
63 "TestDB",
64 1,
65 10,
66 n,
67 3,
68 "MasterSlave",
69 true);
70
71 ClusterController controller =
72 new ClusterController(clusterName, "controller_0", ZK_ADDR);
73 controller.syncStart();
74
75
76 for (int i = 0; i < n; i++)
77 {
78 String instanceName = "localhost_" + (12918 + i);
79
80 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
81 participants[i].syncStart();
82 }
83
84 boolean result =
85 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
86 clusterName));
87 Assert.assertTrue(result);
88
89
90 ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
91 setupTool.addResourceToCluster(clusterName, "TestDB1", 16, "MasterSlave");
92
93 ZkBaseDataAccessor<ZNRecord> baseAccessor =
94 new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
95 ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
96 Builder keyBuilder = accessor.keyBuilder();
97 IdealState idealState = accessor.getProperty(keyBuilder.idealStates("TestDB1"));
98 idealState.setStateModelFactoryName("TestDB1_Factory");
99 accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
100 setupTool.rebalanceStorageCluster(clusterName, "TestDB1", 3);
101
102
103 int totalMsgs = 0;
104 for (int retry = 0; retry < 5; retry++) {
105 Thread.sleep(100);
106 totalMsgs = 0;
107 for (int i = 0; i < n; i++) {
108 List<Message> msgs = accessor.getChildValues(keyBuilder.messages(participants[i].getInstanceName()));
109 totalMsgs += msgs.size();
110 }
111
112 if (totalMsgs == 48)
113 break;
114 }
115
116 Assert.assertEquals(totalMsgs, 48,
117 "Should accumulated 48 unprocessed messages (1 O->S per partition per replica) because TestDB1 is added without state-model-factory but was " + totalMsgs);
118
119
120
121 for (int i = 0; i < n; i++)
122 {
123 participants[i].getManager()
124 .getStateMachineEngine()
125 .registerStateModelFactory("MasterSlave",
126 new MockMSModelFactory(),
127 "TestDB1_Factory");
128 }
129
130 result =
131 ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
132 clusterName));
133 Assert.assertTrue(result);
134
135
136
137 Thread.sleep(1000);
138 controller.syncStop();
139 for (int i = 0; i < 5; i++)
140 {
141 participants[i].syncStop();
142 }
143
144 System.out.println("END " + clusterName + " at "
145 + new Date(System.currentTimeMillis()));
146
147 }
148 }