1 package org.apache.helix.messaging;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.UUID;
25
26 import org.apache.helix.Criteria;
27 import org.apache.helix.HelixDataAccessor;
28 import org.apache.helix.HelixManager;
29 import org.apache.helix.HelixProperty;
30 import org.apache.helix.InstanceType;
31 import org.apache.helix.Mocks;
32 import org.apache.helix.NotificationContext;
33 import org.apache.helix.PropertyKey;
34 import org.apache.helix.PropertyType;
35 import org.apache.helix.ZNRecord;
36 import org.apache.helix.messaging.handling.HelixTaskResult;
37 import org.apache.helix.messaging.handling.MessageHandler;
38 import org.apache.helix.messaging.handling.MessageHandlerFactory;
39 import org.apache.helix.model.ExternalView;
40 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
41 import org.apache.helix.model.Message;
42 import org.apache.helix.tools.DefaultIdealStateCalculator;
43 import org.testng.AssertJUnit;
44 import org.testng.annotations.Test;
45
46
47 public class TestDefaultMessagingService
48 {
49 class MockHelixManager extends Mocks.MockManager
50 {
51 class MockDataAccessor extends Mocks.MockAccessor
52 {
53
54 @Override
55 public <T extends HelixProperty> T getProperty(PropertyKey key)
56 {
57
58 PropertyType type = key.getType();
59 if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
60 {
61 return (T) new ExternalView(_externalView);
62 }
63 return null;
64 }
65
66 @Override
67 public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
68 {
69 PropertyType type = key.getType();
70 List<T> result = new ArrayList<T>();
71 Class<? extends HelixProperty> clazz = key.getTypeClass();
72 if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
73 {
74 HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
75 result.add((T) typedInstance);
76 return result;
77 }
78 else if(type == PropertyType.LIVEINSTANCES)
79 {
80 return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
81 }
82
83 return result;
84 }
85 }
86
87 HelixDataAccessor _accessor = new MockDataAccessor();
88 ZNRecord _externalView;
89 List<String> _instances;
90 List<ZNRecord> _liveInstances;
91 String _db = "DB";
92 int _replicas = 3;
93 int _partitions = 50;
94
95 public MockHelixManager()
96 {
97 _liveInstances = new ArrayList<ZNRecord>();
98 _instances = new ArrayList<String>();
99 for(int i = 0;i<5; i++)
100 {
101 String instance = "localhost_"+(12918+i);
102 _instances.add(instance);
103 ZNRecord metaData = new ZNRecord(instance);
104 metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
105 UUID.randomUUID().toString());
106 _liveInstances.add(metaData);
107 }
108 _externalView = DefaultIdealStateCalculator.calculateIdealState(
109 _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
110
111 }
112
113 @Override
114 public boolean isConnected()
115 {
116 return true;
117 }
118
119 @Override
120 public HelixDataAccessor getHelixDataAccessor()
121 {
122 return _accessor;
123 }
124
125
126 @Override
127 public String getInstanceName()
128 {
129 return "localhost_12919";
130 }
131
132 @Override
133 public InstanceType getInstanceType()
134 {
135 return InstanceType.PARTICIPANT;
136 }
137 }
138
139 class TestMessageHandlerFactory implements MessageHandlerFactory
140 {
141 class TestMessageHandler extends MessageHandler
142 {
143
144 public TestMessageHandler(Message message, NotificationContext context)
145 {
146 super(message, context);
147
148 }
149
150 @Override
151 public HelixTaskResult handleMessage() throws InterruptedException
152 {
153 HelixTaskResult result = new HelixTaskResult();
154 result.setSuccess(true);
155 return result;
156 }
157
158 @Override
159 public void onError( Exception e, ErrorCode code, ErrorType type)
160 {
161
162
163 }
164 }
165 @Override
166 public MessageHandler createHandler(Message message,
167 NotificationContext context)
168 {
169
170 return new TestMessageHandler(message, context);
171 }
172
173 @Override
174 public String getMessageType()
175 {
176
177 return "TestingMessageHandler";
178 }
179
180 @Override
181 public void reset()
182 {
183
184
185 }
186 }
187
188 @Test()
189 public void TestMessageSend()
190 {
191 HelixManager manager = new MockHelixManager();
192 DefaultMessagingService svc = new DefaultMessagingService(manager);
193 TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
194 svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
195
196 Criteria recipientCriteria = new Criteria();
197 recipientCriteria.setInstanceName("localhost_12919");
198 recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
199 recipientCriteria.setSelfExcluded(true);
200
201 Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
202 AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
203
204 recipientCriteria.setSelfExcluded(false);
205 AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
206
207
208 recipientCriteria.setSelfExcluded(false);
209 recipientCriteria.setInstanceName("%");
210 recipientCriteria.setResource("DB");
211 recipientCriteria.setPartition("%");
212 AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
213
214 recipientCriteria.setSelfExcluded(true);
215 recipientCriteria.setInstanceName("%");
216 recipientCriteria.setResource("DB");
217 recipientCriteria.setPartition("%");
218 AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
219
220 recipientCriteria.setSelfExcluded(true);
221 recipientCriteria.setInstanceName("%");
222 recipientCriteria.setResource("DB");
223 recipientCriteria.setPartition("%");
224 AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
225
226 recipientCriteria.setSelfExcluded(true);
227 recipientCriteria.setInstanceName("localhost_12920");
228 recipientCriteria.setResource("DB");
229 recipientCriteria.setPartition("%");
230 AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
231
232
233 recipientCriteria.setSelfExcluded(true);
234 recipientCriteria.setInstanceName("localhost_12920");
235 recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
236 recipientCriteria.setResource("DB");
237 recipientCriteria.setPartition("%");
238 AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
239 }
240 }