View Javadoc

1   package org.apache.helix.messaging.handling;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.HashSet;
23  import java.util.concurrent.ThreadPoolExecutor;
24  
25  import org.apache.helix.ConfigAccessor;
26  import org.apache.helix.model.ConfigScope;
27  import org.apache.helix.model.builder.ConfigScopeBuilder;
28  import org.apache.helix.HelixManager;
29  import org.apache.helix.NotificationContext;
30  import org.apache.helix.integration.ZkStandAloneCMTestBase;
31  import org.apache.helix.messaging.DefaultMessagingService;
32  import org.apache.helix.model.Message;
33  import org.testng.Assert;
34  import org.testng.annotations.Test;
35  
36  
37  public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
38  {
39    public static class TestMessagingHandlerFactory implements MessageHandlerFactory
40    {
41      public static HashSet<String> _processedMsgIds = new HashSet<String>();
42      
43      @Override
44      public MessageHandler createHandler(Message message,
45          NotificationContext context)
46      {
47        return null;
48      }
49      
50      @Override
51      public String getMessageType()
52      {
53        return "TestMsg";
54      }
55      
56      @Override
57      public void reset()
58      {
59        // TODO Auto-generated method stub
60      }
61      
62    }
63    
64    public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
65    {
66      public static HashSet<String> _processedMsgIds = new HashSet<String>();
67      
68      @Override
69      public MessageHandler createHandler(Message message,
70          NotificationContext context)
71      {
72        return null;
73      }
74      
75      @Override
76      public String getMessageType()
77      {
78        return "TestMsg2";
79      }
80      
81      @Override
82      public void reset()
83      {
84        // TODO Auto-generated method stub
85      }
86      
87    }
88    @Test
89    public void TestThreadPoolSizeConfig()
90    {
91      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
92      HelixManager manager = _startCMResultMap.get(instanceName)._manager;
93      
94      ConfigAccessor accessor = manager.getConfigAccessor();
95      ConfigScope scope =
96          new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
97      accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
98      
99      scope =
100         new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
101     accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
102     
103     for (int i = 0; i < NODE_NR; i++)
104     {
105       instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
106       
107       _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
108       _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
109       
110     
111     }
112     
113     for (int i = 0; i < NODE_NR; i++)
114     {
115       instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
116       
117       DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
118       HelixTaskExecutor helixExecutor = svc.getExecutor();
119       ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg"));
120       
121       ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg2"));
122       if(i != 0)
123       {
124         
125         Assert.assertEquals(8, executor.getMaximumPoolSize());
126       }
127       else
128       {
129         Assert.assertEquals(12, executor.getMaximumPoolSize());
130       }
131       Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
132     }
133   }
134 }