1a5506de48d489fad001207ca77e5e7da37e7c15
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doThrow;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.io.IOException;
35 import java.net.MalformedURLException;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.mockito.invocation.InvocationOnMock;
42 import org.mockito.stubbing.Answer;
43 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
44 import org.onap.policy.common.endpoints.event.comm.TopicListener;
45 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
47
48 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
49     private Thread thread;
50     private BusConsumer cons;
51     private TopicListener listener;
52     private SingleThreadedBusTopicSourceImpl source;
53
54     /**
55      * Creates the object to be tested, as well as various mocks.
56      */
57     @Before
58     @Override
59     public void setUp() {
60         super.setUp();
61
62         thread = mock(Thread.class);
63         cons = mock(BusConsumer.class);
64         listener = mock(TopicListener.class);
65         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
66     }
67
68     @After
69     public void tearDown() {
70         source.shutdown();
71     }
72
73     @Test
74     public void testSerialize() {
75         assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class))
76                         .doesNotThrowAnyException();
77     }
78
79     @Test
80     public void testRegister() {
81         source.register(listener);
82         assertEquals(1, source.initCount);
83         source.offer(MY_MESSAGE);
84         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
85
86         // register another - should not re-init
87         TopicListener listener2 = mock(TopicListener.class);
88         source.register(listener2);
89         assertEquals(1, source.initCount);
90         source.offer(MY_MESSAGE + "z");
91         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
92         verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
93
94         // re-register - should not re-init
95         source.register(listener);
96         assertEquals(1, source.initCount);
97         source.offer(MY_MESSAGE2);
98         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
99
100         // lock & register - should not init
101         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
102         source.lock();
103         source.register(listener);
104         assertEquals(0, source.initCount);
105
106         // exception during init
107         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
108         source.initEx = true;
109         source.register(listener);
110     }
111
112     @Test
113     public void testUnregister() {
114         TopicListener listener2 = mock(TopicListener.class);
115         source.register(listener);
116         source.register(listener2);
117
118         // unregister first listener - should NOT invoke close
119         source.unregister(listener);
120         verify(cons, never()).close();
121         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
122
123         // unregister same listener - should not invoke close
124         source.unregister(listener);
125         verify(cons, never()).close();
126         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
127
128         // unregister second listener - SHOULD invoke close
129         source.unregister(listener2);
130         verify(cons).close();
131         assertTrue(source.snapshotTopicListeners().isEmpty());
132
133         // unregister same listener - should not invoke close again
134         source.unregister(listener2);
135         verify(cons).close();
136         assertTrue(source.snapshotTopicListeners().isEmpty());
137     }
138
139     @Test
140     public void testToString() {
141         assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
142     }
143
144     @Test
145     public void testMakePollerThread() {
146         SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
147             @Override
148             public CommInfrastructure getTopicCommInfrastructure() {
149                 return CommInfrastructure.NOOP;
150             }
151
152             @Override
153             public void init() throws MalformedURLException {
154                 // do nothing
155             }
156         };
157
158         assertNotNull(source2.makePollerThread());
159     }
160
161     @Test
162     public void testSingleThreadedBusTopicSource() {
163         // verify that different wrappers can be built
164         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
165         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
166         new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
167         assertThatCode(() -> new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build()))
168                         .doesNotThrowAnyException();
169     }
170
171     @Test
172     public void testStart() {
173         source.start();
174         assertTrue(source.isAlive());
175         assertEquals(1, source.initCount);
176         verify(thread).start();
177
178         // attempt to start again - nothing should be invoked again
179         source.start();
180         assertTrue(source.isAlive());
181         assertEquals(1, source.initCount);
182         verify(thread).start();
183
184         // stop & re-start
185         source.stop();
186         source.start();
187         assertTrue(source.isAlive());
188         assertEquals(2, source.initCount);
189         verify(thread, times(2)).start();
190     }
191
192     @Test(expected = IllegalStateException.class)
193     public void testStart_Locked() {
194         source.lock();
195         source.start();
196     }
197
198     @Test(expected = IllegalStateException.class)
199     public void testStart_InitEx() {
200         source.initEx = true;
201         source.start();
202     }
203
204     @Test
205     public void testStop() {
206         source.start();
207         source.stop();
208         verify(cons).close();
209
210         // stop it again - not re-closed
211         source.stop();
212         verify(cons).close();
213
214         // start & stop again, but with an exception
215         doThrow(new RuntimeException(EXPECTED)).when(cons).close();
216         source.start();
217         source.stop();
218     }
219
220     @Test
221     public void testRun() throws Exception {
222         source.register(listener);
223
224         /*
225          * Die in the middle of fetching messages. Also, throw an exception during the
226          * first fetch attempt.
227          */
228         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
229             int count = 0;
230
231             @Override
232             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
233                 if (++count > 1) {
234                     source.alive = false;
235                     return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
236
237                 } else {
238                     throw new IOException(EXPECTED);
239                 }
240             }
241         });
242         source.alive = true;
243         source.run();
244         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
245         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
246         verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
247
248         /*
249          * Die AFTER fetching messages.
250          */
251         final String msga = "message-A";
252         final String msgb = "message-B";
253         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
254             int count = 0;
255
256             @Override
257             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
258                 if (++count > 1) {
259                     source.alive = false;
260                     return Collections.emptyList();
261
262                 } else {
263                     return Arrays.asList(msga, msgb);
264                 }
265             }
266         });
267         source.alive = true;
268         source.run();
269         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
270         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
271
272         assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
273     }
274
275     @Test
276     public void testOffer() {
277         source.register(listener);
278         source.offer(MY_MESSAGE);
279         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
280         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
281     }
282
283     @Test(expected = IllegalStateException.class)
284     public void testOffer_NotStarted() {
285         source.offer(MY_MESSAGE);
286     }
287
288     @Test
289     public void testGetConsumerGroup() {
290         assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
291     }
292
293     @Test
294     public void testGetConsumerInstance() {
295         assertEquals(MY_CONS_INST, source.getConsumerInstance());
296     }
297
298     @Test
299     public void testShutdown() {
300         source.register(listener);
301
302         source.shutdown();
303         verify(cons).close();
304         assertTrue(source.snapshotTopicListeners().isEmpty());
305     }
306
307     @Test
308     public void testGetFetchTimeout() {
309         assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
310     }
311
312     @Test
313     public void testGetFetchLimit() {
314         assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
315     }
316
317     /**
318      * Implementation of SingleThreadedBusTopicSource that counts the number of times
319      * init() is invoked.
320      */
321     private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
322
323         private int initCount = 0;
324         private boolean initEx = false;
325
326         public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
327             super(busTopicParams);
328         }
329
330         @Override
331         public CommInfrastructure getTopicCommInfrastructure() {
332             return CommInfrastructure.NOOP;
333         }
334
335         @Override
336         public void init() throws MalformedURLException {
337             ++initCount;
338
339             if (initEx) {
340                 throw new MalformedURLException(EXPECTED);
341             }
342
343             consumer = cons;
344         }
345
346         @Override
347         protected Thread makePollerThread() {
348             return thread;
349         }
350
351     }
352 }