1 package org.apache.helix.messaging.handling;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.HashSet;
23 import java.util.concurrent.ThreadPoolExecutor;
24
25 import org.apache.helix.ConfigAccessor;
26 import org.apache.helix.model.ConfigScope;
27 import org.apache.helix.model.builder.ConfigScopeBuilder;
28 import org.apache.helix.HelixManager;
29 import org.apache.helix.NotificationContext;
30 import org.apache.helix.integration.ZkStandAloneCMTestBase;
31 import org.apache.helix.messaging.DefaultMessagingService;
32 import org.apache.helix.model.Message;
33 import org.testng.Assert;
34 import org.testng.annotations.Test;
35
36
37 public class TestConfigThreadpoolSize extends ZkStandAloneCMTestBase
38 {
39 public static class TestMessagingHandlerFactory implements MessageHandlerFactory
40 {
41 public static HashSet<String> _processedMsgIds = new HashSet<String>();
42
43 @Override
44 public MessageHandler createHandler(Message message,
45 NotificationContext context)
46 {
47 return null;
48 }
49
50 @Override
51 public String getMessageType()
52 {
53 return "TestMsg";
54 }
55
56 @Override
57 public void reset()
58 {
59
60 }
61
62 }
63
64 public static class TestMessagingHandlerFactory2 implements MessageHandlerFactory
65 {
66 public static HashSet<String> _processedMsgIds = new HashSet<String>();
67
68 @Override
69 public MessageHandler createHandler(Message message,
70 NotificationContext context)
71 {
72 return null;
73 }
74
75 @Override
76 public String getMessageType()
77 {
78 return "TestMsg2";
79 }
80
81 @Override
82 public void reset()
83 {
84
85 }
86
87 }
88 @Test
89 public void TestThreadPoolSizeConfig()
90 {
91 String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
92 HelixManager manager = _startCMResultMap.get(instanceName)._manager;
93
94 ConfigAccessor accessor = manager.getConfigAccessor();
95 ConfigScope scope =
96 new ConfigScopeBuilder().forCluster(manager.getClusterName()).forParticipant(instanceName).build();
97 accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+12);
98
99 scope =
100 new ConfigScopeBuilder().forCluster(manager.getClusterName()).build();
101 accessor.set(scope, "TestMsg."+ HelixTaskExecutor.MAX_THREADS, ""+8);
102
103 for (int i = 0; i < NODE_NR; i++)
104 {
105 instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
106
107 _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg", new TestMessagingHandlerFactory());
108 _startCMResultMap.get(instanceName)._manager.getMessagingService().registerMessageHandlerFactory("TestMsg2", new TestMessagingHandlerFactory2());
109
110
111 }
112
113 for (int i = 0; i < NODE_NR; i++)
114 {
115 instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
116
117 DefaultMessagingService svc = (DefaultMessagingService)(_startCMResultMap.get(instanceName)._manager.getMessagingService());
118 HelixTaskExecutor helixExecutor = svc.getExecutor();
119 ThreadPoolExecutor executor = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg"));
120
121 ThreadPoolExecutor executor2 = (ThreadPoolExecutor)(helixExecutor._executorMap.get("TestMsg2"));
122 if(i != 0)
123 {
124
125 Assert.assertEquals(8, executor.getMaximumPoolSize());
126 }
127 else
128 {
129 Assert.assertEquals(12, executor.getMaximumPoolSize());
130 }
131 Assert.assertEquals(HelixTaskExecutor.DEFAULT_PARALLEL_TASKS, executor2.getMaximumPoolSize());
132 }
133 }
134 }