d0e8569b967988c6a93e61779f1412afeac58864
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.never;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.net.MalformedURLException;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.mockito.invocation.InvocationOnMock;
41 import org.mockito.stubbing.Answer;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicListener;
44 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
46
47 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
48     private Thread thread;
49     private BusConsumer cons;
50     private TopicListener listener;
51     private SingleThreadedBusTopicSourceImpl source;
52
53     /**
54      * Creates the object to be tested, as well as various mocks.
55      */
56     @Before
57     public void setUp() {
58         super.setUp();
59
60         thread = mock(Thread.class);
61         cons = mock(BusConsumer.class);
62         listener = mock(TopicListener.class);
63         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
64     }
65
66     @After
67     public void tearDown() {
68         source.shutdown();
69     }
70
71     @Test
72     public void testRegister() {
73         source.register(listener);
74         assertEquals(1, source.initCount);
75         source.offer(MY_MESSAGE);
76         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
77
78         // register another - should not re-init
79         TopicListener listener2 = mock(TopicListener.class);
80         source.register(listener2);
81         assertEquals(1, source.initCount);
82         source.offer(MY_MESSAGE + "z");
83         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
84         verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
85
86         // re-register - should not re-init
87         source.register(listener);
88         assertEquals(1, source.initCount);
89         source.offer(MY_MESSAGE2);
90         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
91
92         // lock & register - should not init
93         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
94         source.lock();
95         source.register(listener);
96         assertEquals(0, source.initCount);
97
98         // exception during init
99         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
100         source.initEx = true;
101         source.register(listener);
102     }
103
104     @Test
105     public void testUnregister() {
106         TopicListener listener2 = mock(TopicListener.class);
107         source.register(listener);
108         source.register(listener2);
109
110         // unregister first listener - should NOT invoke close
111         source.unregister(listener);
112         verify(cons, never()).close();
113         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
114
115         // unregister same listener - should not invoke close
116         source.unregister(listener);
117         verify(cons, never()).close();
118         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
119
120         // unregister second listener - SHOULD invoke close
121         source.unregister(listener2);
122         verify(cons).close();
123         assertTrue(source.snapshotTopicListeners().isEmpty());
124
125         // unregister same listener - should not invoke close again
126         source.unregister(listener2);
127         verify(cons).close();
128         assertTrue(source.snapshotTopicListeners().isEmpty());
129     }
130
131     @Test
132     public void testToString() {
133         assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
134     }
135
136     @Test
137     public void testMakePollerThread() {
138         SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
139             @Override
140             public CommInfrastructure getTopicCommInfrastructure() {
141                 return CommInfrastructure.NOOP;
142             }
143
144             @Override
145             public void init() throws MalformedURLException {
146                 // do nothing
147             }
148         };
149         
150         assertNotNull(source2.makePollerThread());
151     }
152
153     @Test
154     public void testSingleThreadedBusTopicSource() {
155         // verify that different wrappers can be built
156         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
157         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
158         new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
159         new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build());
160     }
161
162     @Test
163     public void testStart() {
164         source.start();
165         assertTrue(source.isAlive());
166         assertEquals(1, source.initCount);
167         verify(thread).start();
168
169         // attempt to start again - nothing should be invoked again
170         source.start();
171         assertTrue(source.isAlive());
172         assertEquals(1, source.initCount);
173         verify(thread).start();
174
175         // stop & re-start
176         source.stop();
177         source.start();
178         assertTrue(source.isAlive());
179         assertEquals(2, source.initCount);
180         verify(thread, times(2)).start();
181     }
182
183     @Test(expected = IllegalStateException.class)
184     public void testStart_Locked() {
185         source.lock();
186         source.start();
187     }
188
189     @Test(expected = IllegalStateException.class)
190     public void testStart_InitEx() {
191         source.initEx = true;
192         source.start();
193     }
194
195     @Test
196     public void testStop() {
197         source.start();
198         source.stop();
199         verify(cons).close();
200
201         // stop it again - not re-closed
202         source.stop();
203         verify(cons).close();
204
205         // start & stop again, but with an exception
206         doThrow(new RuntimeException(EXPECTED)).when(cons).close();
207         source.start();
208         source.stop();
209     }
210
211     @Test
212     public void testRun() throws Exception {
213         source.register(listener);
214
215         /*
216          * Die in the middle of fetching messages. Also, throw an exception during the
217          * first fetch attempt.
218          */
219         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
220             int count = 0;
221
222             @Override
223             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
224                 if (++count > 1) {
225                     source.alive = false;
226                     return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
227
228                 } else {
229                     throw new IOException(EXPECTED);
230                 }
231             }
232         });
233         source.alive = true;
234         source.run();
235         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
236         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
237         verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
238
239         /*
240          * Die AFTER fetching messages.
241          */
242         final String msga = "message-A";
243         final String msgb = "message-B";
244         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
245             int count = 0;
246
247             @Override
248             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
249                 if (++count > 1) {
250                     source.alive = false;
251                     return Collections.emptyList();
252
253                 } else {
254                     return Arrays.asList(msga, msgb);
255                 }
256             }
257         });
258         source.alive = true;
259         source.run();
260         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
261         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
262
263         assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
264     }
265
266     @Test
267     public void testOffer() {
268         source.register(listener);
269         source.offer(MY_MESSAGE);
270         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
271         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
272     }
273
274     @Test(expected = IllegalStateException.class)
275     public void testOffer_NotStarted() {
276         source.offer(MY_MESSAGE);
277     }
278
279     @Test
280     public void testSetFilter() {
281         FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
282         cons = filt;
283
284         source.start();
285         source.setFilter("my-filter");
286         verify(filt).setFilter("my-filter");
287     }
288
289     @Test(expected = UnsupportedOperationException.class)
290     public void testSetFilter_Unsupported() {
291         source.start();
292         source.setFilter("unsupported-filter");
293     }
294
295     @Test
296     public void testGetConsumerGroup() {
297         assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
298     }
299
300     @Test
301     public void testGetConsumerInstance() {
302         assertEquals(MY_CONS_INST, source.getConsumerInstance());
303     }
304
305     @Test
306     public void testShutdown() {
307         source.register(listener);
308
309         source.shutdown();
310         verify(cons).close();
311         assertTrue(source.snapshotTopicListeners().isEmpty());
312     }
313
314     @Test
315     public void testGetFetchTimeout() {
316         assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
317     }
318
319     @Test
320     public void testGetFetchLimit() {
321         assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
322     }
323
324     /**
325      * Implementation of SingleThreadedBusTopicSource that counts the number of times
326      * init() is invoked.
327      */
328     private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
329
330         private int initCount = 0;
331         private boolean initEx = false;
332
333         public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
334             super(busTopicParams);
335         }
336
337         @Override
338         public CommInfrastructure getTopicCommInfrastructure() {
339             return CommInfrastructure.NOOP;
340         }
341
342         @Override
343         public void init() throws MalformedURLException {
344             ++initCount;
345
346             if (initEx) {
347                 throw new MalformedURLException(EXPECTED);
348             }
349
350             consumer = cons;
351         }
352
353         @Override
354         protected Thread makePollerThread() {
355             return thread;
356         }
357
358     }
359 }