View Javadoc

1   package org.apache.helix.messaging;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.UUID;
25  
26  import org.apache.helix.Criteria;
27  import org.apache.helix.HelixDataAccessor;
28  import org.apache.helix.HelixManager;
29  import org.apache.helix.HelixProperty;
30  import org.apache.helix.InstanceType;
31  import org.apache.helix.Mocks;
32  import org.apache.helix.NotificationContext;
33  import org.apache.helix.PropertyKey;
34  import org.apache.helix.PropertyType;
35  import org.apache.helix.ZNRecord;
36  import org.apache.helix.messaging.handling.HelixTaskResult;
37  import org.apache.helix.messaging.handling.MessageHandler;
38  import org.apache.helix.messaging.handling.MessageHandlerFactory;
39  import org.apache.helix.model.ExternalView;
40  import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
41  import org.apache.helix.model.Message;
42  import org.apache.helix.tools.DefaultIdealStateCalculator;
43  import org.testng.AssertJUnit;
44  import org.testng.annotations.Test;
45  
46  
47  public class TestDefaultMessagingService
48  {
49    class MockHelixManager extends Mocks.MockManager
50    {
51      class MockDataAccessor extends Mocks.MockAccessor
52      {
53        
54        @Override
55        public <T extends HelixProperty> T getProperty(PropertyKey key)
56        {
57          
58          PropertyType type = key.getType();
59          if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
60          {
61            return (T) new ExternalView(_externalView);
62          }
63          return null;
64        }
65  
66        @Override
67        public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
68        {
69          PropertyType type = key.getType();
70          List<T> result = new ArrayList<T>();
71          Class<? extends HelixProperty> clazz = key.getTypeClass();
72          if(type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES)
73          {
74            HelixProperty typedInstance = HelixProperty.convertToTypedInstance(clazz, _externalView);
75            result.add((T) typedInstance);
76            return result;
77          }
78          else if(type == PropertyType.LIVEINSTANCES)
79          {
80            return (List<T>) HelixProperty.convertToTypedList(clazz, _liveInstances);
81          }
82  
83          return result;
84        }
85      }
86  
87      HelixDataAccessor _accessor = new MockDataAccessor();
88      ZNRecord _externalView;
89      List<String> _instances;
90      List<ZNRecord> _liveInstances;
91      String _db = "DB";
92      int _replicas = 3;
93      int _partitions = 50;
94  
95      public MockHelixManager()
96      {
97        _liveInstances = new ArrayList<ZNRecord>();
98        _instances = new ArrayList<String>();
99        for(int i = 0;i<5; i++)
100       {
101         String instance = "localhost_"+(12918+i);
102         _instances.add(instance);
103         ZNRecord metaData = new ZNRecord(instance);
104         metaData.setSimpleField(LiveInstanceProperty.SESSION_ID.toString(),
105             UUID.randomUUID().toString());
106         _liveInstances.add(metaData);
107       }
108       _externalView = DefaultIdealStateCalculator.calculateIdealState(
109           _instances, _partitions, _replicas, _db, "MASTER", "SLAVE");
110 
111     }
112 
113     @Override
114     public boolean isConnected()
115     {
116       return true;
117     }
118 
119     @Override
120     public HelixDataAccessor getHelixDataAccessor()
121     {
122       return _accessor;
123     }
124 
125 
126     @Override
127     public String getInstanceName()
128     {
129       return "localhost_12919";
130     }
131 
132     @Override
133     public InstanceType getInstanceType()
134     {
135       return InstanceType.PARTICIPANT;
136     }
137   }
138 
139   class TestMessageHandlerFactory implements MessageHandlerFactory
140   {
141     class TestMessageHandler extends MessageHandler
142     {
143 
144       public TestMessageHandler(Message message, NotificationContext context)
145       {
146         super(message, context);
147         // TODO Auto-generated constructor stub
148       }
149 
150       @Override
151       public HelixTaskResult handleMessage() throws InterruptedException
152       {
153         HelixTaskResult result = new HelixTaskResult();
154         result.setSuccess(true);
155         return result;
156       }
157 
158       @Override
159       public void onError( Exception e, ErrorCode code, ErrorType type)
160       {
161         // TODO Auto-generated method stub
162         
163       }
164     }
165     @Override
166     public MessageHandler createHandler(Message message,
167         NotificationContext context)
168     {
169       // TODO Auto-generated method stub
170       return new TestMessageHandler(message, context);
171     }
172 
173     @Override
174     public String getMessageType()
175     {
176       // TODO Auto-generated method stub
177       return "TestingMessageHandler";
178     }
179 
180     @Override
181     public void reset()
182     {
183       // TODO Auto-generated method stub
184 
185     }
186   }
187 
188   @Test()
189   public void TestMessageSend()
190   {
191     HelixManager manager = new MockHelixManager();
192     DefaultMessagingService svc = new DefaultMessagingService(manager);
193     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
194     svc.registerMessageHandlerFactory(factory.getMessageType(), factory);
195 
196     Criteria recipientCriteria = new Criteria();
197     recipientCriteria.setInstanceName("localhost_12919");
198     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
199     recipientCriteria.setSelfExcluded(true);
200 
201     Message template = new Message(factory.getMessageType(), UUID.randomUUID().toString());
202     AssertJUnit.assertEquals(0, svc.send(recipientCriteria, template));
203 
204     recipientCriteria.setSelfExcluded(false);
205     AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
206 
207 
208     recipientCriteria.setSelfExcluded(false);
209     recipientCriteria.setInstanceName("%");
210     recipientCriteria.setResource("DB");
211     recipientCriteria.setPartition("%");
212     AssertJUnit.assertEquals(200, svc.send(recipientCriteria, template));
213 
214     recipientCriteria.setSelfExcluded(true);
215     recipientCriteria.setInstanceName("%");
216     recipientCriteria.setResource("DB");
217     recipientCriteria.setPartition("%");
218     AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
219 
220     recipientCriteria.setSelfExcluded(true);
221     recipientCriteria.setInstanceName("%");
222     recipientCriteria.setResource("DB");
223     recipientCriteria.setPartition("%");
224     AssertJUnit.assertEquals(159, svc.send(recipientCriteria, template));
225 
226     recipientCriteria.setSelfExcluded(true);
227     recipientCriteria.setInstanceName("localhost_12920");
228     recipientCriteria.setResource("DB");
229     recipientCriteria.setPartition("%");
230     AssertJUnit.assertEquals(39, svc.send(recipientCriteria, template));
231 
232 
233     recipientCriteria.setSelfExcluded(true);
234     recipientCriteria.setInstanceName("localhost_12920");
235     recipientCriteria.setRecipientInstanceType(InstanceType.CONTROLLER);
236     recipientCriteria.setResource("DB");
237     recipientCriteria.setPartition("%");
238     AssertJUnit.assertEquals(1, svc.send(recipientCriteria, template));
239   }
240 }