View Javadoc

1   package org.apache.helix.integration;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Set;
25  
26  import org.apache.helix.PropertyKey.Builder;
27  import org.apache.helix.manager.zk.ZKHelixDataAccessor;
28  import org.apache.helix.manager.zk.ZNRecordSerializer;
29  import org.apache.helix.manager.zk.ZkBaseDataAccessor;
30  import org.apache.helix.manager.zk.ZkClient;
31  import org.apache.helix.model.ExternalView;
32  import org.apache.helix.util.StatusUpdateUtil;
33  import org.testng.Assert;
34  
35  
36  public class TestStatusUpdate extends ZkStandAloneCMTestBase
37  {
38    // For now write participant StatusUpdates to log4j.
39    // TODO: Need to investigate another data channel to report to controller and re-enable
40    // this test
41    // @Test
42    public void testParticipantStatusUpdates() throws Exception
43    {
44      ZkClient zkClient = new ZkClient(ZkIntegrationTestBase.ZK_ADDR);
45      zkClient.setZkSerializer(new ZNRecordSerializer());
46      ZKHelixDataAccessor accessor =
47          new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(zkClient));
48      Builder keyBuilder = accessor.keyBuilder();
49  
50      List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
51      Assert.assertNotNull(extViews);
52  
53      for (ExternalView extView : extViews)
54      {
55        String resourceName = extView.getResourceName();
56        Set<String> partitionSet = extView.getPartitionSet();
57        for (String partition : partitionSet)
58        {
59          Map<String, String> stateMap = extView.getStateMap(partition);
60          for (String instance : stateMap.keySet())
61          {
62            String state = stateMap.get(instance);
63            StatusUpdateUtil.StatusUpdateContents statusUpdates =
64                StatusUpdateUtil.StatusUpdateContents.getStatusUpdateContents(accessor,
65                                                                              instance,
66                                                                              resourceName,
67                                                                              partition);
68  
69            Map<String, StatusUpdateUtil.TaskStatus> taskMessages =
70                statusUpdates.getTaskMessages();
71            List<StatusUpdateUtil.Transition> transitions = statusUpdates.getTransitions();
72            if (state.equals("MASTER"))
73            {
74              Assert.assertEquals(transitions.size() >= 2,
75                                  true,
76                                  "Invalid number of transitions");
77              StatusUpdateUtil.Transition lastTransition =
78                  transitions.get(transitions.size() - 1);
79              StatusUpdateUtil.Transition prevTransition =
80                  transitions.get(transitions.size() - 2);
81              Assert.assertEquals(taskMessages.get(lastTransition.getMsgID()),
82                                  StatusUpdateUtil.TaskStatus.COMPLETED,
83                                  "Incomplete transition");
84              Assert.assertEquals(taskMessages.get(prevTransition.getMsgID()),
85                                  StatusUpdateUtil.TaskStatus.COMPLETED,
86                                  "Incomplete transition");
87              Assert.assertEquals(lastTransition.getFromState(), "SLAVE", "Invalid State");
88              Assert.assertEquals(lastTransition.getToState(), "MASTER", "Invalid State");
89            }
90            else if (state.equals("SLAVE"))
91            {
92              Assert.assertEquals(transitions.size() >= 1,
93                                  true,
94                                  "Invalid number of transitions");
95              StatusUpdateUtil.Transition lastTransition =
96                  transitions.get(transitions.size() - 1);
97              Assert.assertEquals(lastTransition.getFromState().equals("MASTER")
98                                      || lastTransition.getFromState().equals("OFFLINE"),
99                                  true,
100                                 "Invalid transition");
101             Assert.assertEquals(lastTransition.getToState(), "SLAVE", "Invalid State");
102           }
103         }
104       }
105     }
106   }
107 }