16d74df25f3baf6eff0a3b6ea0148ea5043bb432
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.mockito.Mockito.doThrow;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.never;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.io.IOException;
34 import java.net.MalformedURLException;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.mockito.invocation.InvocationOnMock;
41 import org.mockito.stubbing.Answer;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicListener;
44 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
47
48 public class SingleThreadedBusTopicSourceTest extends TopicTestBase {
49     private Thread thread;
50     private BusConsumer cons;
51     private TopicListener listener;
52     private SingleThreadedBusTopicSourceImpl source;
53
54     /**
55      * Creates the object to be tested, as well as various mocks.
56      */
57     @Before
58     @Override
59     public void setUp() {
60         super.setUp();
61
62         thread = mock(Thread.class);
63         cons = mock(BusConsumer.class);
64         listener = mock(TopicListener.class);
65         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
66     }
67
68     @After
69     public void tearDown() {
70         source.shutdown();
71     }
72
73     @Test
74     public void testSerialize() {
75         new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class);
76     }
77
78     @Test
79     public void testRegister() {
80         source.register(listener);
81         assertEquals(1, source.initCount);
82         source.offer(MY_MESSAGE);
83         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
84
85         // register another - should not re-init
86         TopicListener listener2 = mock(TopicListener.class);
87         source.register(listener2);
88         assertEquals(1, source.initCount);
89         source.offer(MY_MESSAGE + "z");
90         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
91         verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
92
93         // re-register - should not re-init
94         source.register(listener);
95         assertEquals(1, source.initCount);
96         source.offer(MY_MESSAGE2);
97         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
98
99         // lock & register - should not init
100         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
101         source.lock();
102         source.register(listener);
103         assertEquals(0, source.initCount);
104
105         // exception during init
106         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
107         source.initEx = true;
108         source.register(listener);
109     }
110
111     @Test
112     public void testUnregister() {
113         TopicListener listener2 = mock(TopicListener.class);
114         source.register(listener);
115         source.register(listener2);
116
117         // unregister first listener - should NOT invoke close
118         source.unregister(listener);
119         verify(cons, never()).close();
120         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
121
122         // unregister same listener - should not invoke close
123         source.unregister(listener);
124         verify(cons, never()).close();
125         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
126
127         // unregister second listener - SHOULD invoke close
128         source.unregister(listener2);
129         verify(cons).close();
130         assertTrue(source.snapshotTopicListeners().isEmpty());
131
132         // unregister same listener - should not invoke close again
133         source.unregister(listener2);
134         verify(cons).close();
135         assertTrue(source.snapshotTopicListeners().isEmpty());
136     }
137
138     @Test
139     public void testToString() {
140         assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
141     }
142
143     @Test
144     public void testMakePollerThread() {
145         SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
146             @Override
147             public CommInfrastructure getTopicCommInfrastructure() {
148                 return CommInfrastructure.NOOP;
149             }
150
151             @Override
152             public void init() throws MalformedURLException {
153                 // do nothing
154             }
155         };
156
157         assertNotNull(source2.makePollerThread());
158     }
159
160     @Test
161     public void testSingleThreadedBusTopicSource() {
162         // verify that different wrappers can be built
163         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
164         new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
165         new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchTimeout(-1).build());
166         new SingleThreadedBusTopicSourceImpl(makeBuilder().fetchLimit(-1).build());
167     }
168
169     @Test
170     public void testStart() {
171         source.start();
172         assertTrue(source.isAlive());
173         assertEquals(1, source.initCount);
174         verify(thread).start();
175
176         // attempt to start again - nothing should be invoked again
177         source.start();
178         assertTrue(source.isAlive());
179         assertEquals(1, source.initCount);
180         verify(thread).start();
181
182         // stop & re-start
183         source.stop();
184         source.start();
185         assertTrue(source.isAlive());
186         assertEquals(2, source.initCount);
187         verify(thread, times(2)).start();
188     }
189
190     @Test(expected = IllegalStateException.class)
191     public void testStart_Locked() {
192         source.lock();
193         source.start();
194     }
195
196     @Test(expected = IllegalStateException.class)
197     public void testStart_InitEx() {
198         source.initEx = true;
199         source.start();
200     }
201
202     @Test
203     public void testStop() {
204         source.start();
205         source.stop();
206         verify(cons).close();
207
208         // stop it again - not re-closed
209         source.stop();
210         verify(cons).close();
211
212         // start & stop again, but with an exception
213         doThrow(new RuntimeException(EXPECTED)).when(cons).close();
214         source.start();
215         source.stop();
216     }
217
218     @Test
219     public void testRun() throws Exception {
220         source.register(listener);
221
222         /*
223          * Die in the middle of fetching messages. Also, throw an exception during the
224          * first fetch attempt.
225          */
226         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
227             int count = 0;
228
229             @Override
230             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
231                 if (++count > 1) {
232                     source.alive = false;
233                     return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
234
235                 } else {
236                     throw new IOException(EXPECTED);
237                 }
238             }
239         });
240         source.alive = true;
241         source.run();
242         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
243         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
244         verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
245
246         /*
247          * Die AFTER fetching messages.
248          */
249         final String msga = "message-A";
250         final String msgb = "message-B";
251         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
252             int count = 0;
253
254             @Override
255             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
256                 if (++count > 1) {
257                     source.alive = false;
258                     return Collections.emptyList();
259
260                 } else {
261                     return Arrays.asList(msga, msgb);
262                 }
263             }
264         });
265         source.alive = true;
266         source.run();
267         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
268         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
269
270         assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
271     }
272
273     @Test
274     public void testOffer() {
275         source.register(listener);
276         source.offer(MY_MESSAGE);
277         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
278         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
279     }
280
281     @Test(expected = IllegalStateException.class)
282     public void testOffer_NotStarted() {
283         source.offer(MY_MESSAGE);
284     }
285
286     @Test
287     public void testSetFilter() {
288         FilterableBusConsumer filt = mock(FilterableBusConsumer.class);
289         cons = filt;
290
291         source.start();
292         source.setFilter("my-filter");
293         verify(filt).setFilter("my-filter");
294     }
295
296     @Test(expected = UnsupportedOperationException.class)
297     public void testSetFilter_Unsupported() {
298         source.start();
299         source.setFilter("unsupported-filter");
300     }
301
302     @Test
303     public void testGetConsumerGroup() {
304         assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
305     }
306
307     @Test
308     public void testGetConsumerInstance() {
309         assertEquals(MY_CONS_INST, source.getConsumerInstance());
310     }
311
312     @Test
313     public void testShutdown() {
314         source.register(listener);
315
316         source.shutdown();
317         verify(cons).close();
318         assertTrue(source.snapshotTopicListeners().isEmpty());
319     }
320
321     @Test
322     public void testGetFetchTimeout() {
323         assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
324     }
325
326     @Test
327     public void testGetFetchLimit() {
328         assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
329     }
330
331     /**
332      * Implementation of SingleThreadedBusTopicSource that counts the number of times
333      * init() is invoked.
334      */
335     private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
336
337         private int initCount = 0;
338         private boolean initEx = false;
339
340         public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
341             super(busTopicParams);
342         }
343
344         @Override
345         public CommInfrastructure getTopicCommInfrastructure() {
346             return CommInfrastructure.NOOP;
347         }
348
349         @Override
350         public void init() throws MalformedURLException {
351             ++initCount;
352
353             if (initEx) {
354                 throw new MalformedURLException(EXPECTED);
355             }
356
357             consumer = cons;
358         }
359
360         @Override
361         protected Thread makePollerThread() {
362             return thread;
363         }
364
365     }
366 }