bc2d37797be285de89484f0c52491e62781b2b33
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.jupiter.api.Assertions.assertEquals;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.Mockito.doThrow;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.times;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.io.IOException;
38 import java.net.MalformedURLException;
39 import java.util.Arrays;
40 import java.util.Collections;
41 import org.junit.jupiter.api.AfterEach;
42 import org.junit.jupiter.api.BeforeEach;
43 import org.junit.jupiter.api.Test;
44 import org.mockito.invocation.InvocationOnMock;
45 import org.mockito.stubbing.Answer;
46 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
47 import org.onap.policy.common.endpoints.event.comm.TopicListener;
48 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
49 import org.onap.policy.common.utils.gson.GsonTestUtils;
50 import org.onap.policy.common.utils.network.NetworkUtil;
51
52 class SingleThreadedBusTopicSourceTest extends TopicTestBase {
53     private Thread thread;
54     private BusConsumer cons;
55     private TopicListener listener;
56     private SingleThreadedBusTopicSourceImpl source;
57
58     /**
59      * Creates the object to be tested, as well as various mocks.
60      */
61     @BeforeEach
62     @Override
63     public void setUp() {
64         super.setUp();
65
66         thread = mock(Thread.class);
67         cons = mock(BusConsumer.class);
68         listener = mock(TopicListener.class);
69         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
70     }
71
72     @AfterEach
73     public void tearDown() {
74         source.shutdown();
75     }
76
77     @Test
78     void testSerialize() {
79         assertThatCode(() -> new GsonTestUtils().compareGson(source, SingleThreadedBusTopicSourceTest.class))
80                         .doesNotThrowAnyException();
81     }
82
83     @Test
84     void testRegister() {
85         source.register(listener);
86         assertEquals(1, source.initCount);
87         source.offer(MY_MESSAGE);
88         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
89
90         // register another - should not re-init
91         TopicListener listener2 = mock(TopicListener.class);
92         source.register(listener2);
93         assertEquals(1, source.initCount);
94         source.offer(MY_MESSAGE + "z");
95         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
96         verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE + "z");
97
98         // re-register - should not re-init
99         source.register(listener);
100         assertEquals(1, source.initCount);
101         source.offer(MY_MESSAGE2);
102         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
103
104         // lock & register - should not init
105         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
106         source.lock();
107         source.register(listener);
108         assertEquals(0, source.initCount);
109
110         // exception during init
111         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
112         source.initEx = true;
113         source.register(listener);
114     }
115
116     @Test
117     void testUnregister() {
118         TopicListener listener2 = mock(TopicListener.class);
119         source.register(listener);
120         source.register(listener2);
121
122         // unregister first listener - should NOT invoke close
123         source.unregister(listener);
124         verify(cons, never()).close();
125         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
126
127         // unregister same listener - should not invoke close
128         source.unregister(listener);
129         verify(cons, never()).close();
130         assertEquals(Arrays.asList(listener2), source.snapshotTopicListeners());
131
132         // unregister second listener - SHOULD invoke close
133         source.unregister(listener2);
134         verify(cons).close();
135         assertTrue(source.snapshotTopicListeners().isEmpty());
136
137         // unregister same listener - should not invoke close again
138         source.unregister(listener2);
139         verify(cons).close();
140         assertTrue(source.snapshotTopicListeners().isEmpty());
141     }
142
143     @Test
144     void testToString() {
145         assertTrue(source.toString().startsWith("SingleThreadedBusTopicSource ["));
146     }
147
148     @Test
149     void testMakePollerThread() {
150         SingleThreadedBusTopicSource source2 = new SingleThreadedBusTopicSource(makeBuilder().build()) {
151             @Override
152             public CommInfrastructure getTopicCommInfrastructure() {
153                 return CommInfrastructure.NOOP;
154             }
155
156             @Override
157             public void init() throws MalformedURLException {
158                 // do nothing
159             }
160         };
161
162         assertNotNull(source2.makePollerThread());
163     }
164
165     @Test
166     void testSingleThreadedBusTopicSource() {
167         // Note: if the value contains "-", it's probably a UUID
168
169         // verify that different wrappers can be built
170         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().build());
171         assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
172         assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
173
174         // group is null => group is UUID, instance is as provided
175         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).build());
176         assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
177         assertThat(source.getConsumerInstance()).isEqualTo(MY_CONS_INST);
178
179         // instance is null => group is as provided, instance is UUID
180         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerInstance(null).build());
181         assertThat(source.getConsumerGroup()).isEqualTo(MY_CONS_GROUP);
182         assertThat(source.getConsumerInstance()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
183
184         // group & instance are null => group is UUID, instance is hostname
185         source = new SingleThreadedBusTopicSourceImpl(makeBuilder().consumerGroup(null).consumerInstance(null).build());
186         assertThat(source.getConsumerGroup()).contains("-").isNotEqualTo(NetworkUtil.getHostname());
187         assertThat(source.getConsumerInstance()).isEqualTo(NetworkUtil.getHostname());
188
189         assertThatCode(() -> new SingleThreadedBusTopicSourceImpl(
190                         makeBuilder().fetchLimit(-1).fetchTimeout(-1).build())).doesNotThrowAnyException();
191     }
192
193     @Test
194     void testStart() {
195         source.start();
196         assertTrue(source.isAlive());
197         assertEquals(1, source.initCount);
198         verify(thread).start();
199
200         // attempt to start again - nothing should be invoked again
201         source.start();
202         assertTrue(source.isAlive());
203         assertEquals(1, source.initCount);
204         verify(thread).start();
205
206         // stop & re-start
207         source.stop();
208         source.start();
209         assertTrue(source.isAlive());
210         assertEquals(2, source.initCount);
211         verify(thread, times(2)).start();
212     }
213
214     @Test
215     void testStart_Locked() {
216         source.lock();
217         assertThatThrownBy(() -> source.start()).isInstanceOf(IllegalStateException.class);
218     }
219
220     @Test
221     void testStart_InitEx() {
222         assertThatThrownBy(() -> {
223             source.initEx = true;
224
225             source.start();
226         }).isInstanceOf(IllegalStateException.class);
227     }
228
229     @Test
230     void testStop() {
231         source.start();
232         source.stop();
233         verify(cons).close();
234
235         // stop it again - not re-closed
236         source.stop();
237         verify(cons).close();
238
239         // start & stop again, but with an exception
240         doThrow(new RuntimeException(EXPECTED)).when(cons).close();
241         source.start();
242         source.stop();
243     }
244
245     @Test
246     void testRun() throws Exception {
247         source.register(listener);
248
249         /*
250          * Die in the middle of fetching messages. Also, throw an exception during the
251          * first fetch attempt.
252          */
253         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
254             int count = 0;
255
256             @Override
257             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
258                 if (++count > 1) {
259                     source.alive = false;
260                     return Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
261
262                 } else {
263                     throw new IOException(EXPECTED);
264                 }
265             }
266         });
267         source.alive = true;
268         source.run();
269         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
270         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
271         verify(listener, never()).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE2);
272
273         /*
274          * Die AFTER fetching messages.
275          */
276         final String msga = "message-A";
277         final String msgb = "message-B";
278         when(cons.fetch()).thenAnswer(new Answer<Iterable<String>>() {
279             int count = 0;
280
281             @Override
282             public Iterable<String> answer(InvocationOnMock invocation) throws Throwable {
283                 if (++count > 1) {
284                     source.alive = false;
285                     return Collections.emptyList();
286
287                 } else {
288                     return Arrays.asList(msga, msgb);
289                 }
290             }
291         });
292         source.alive = true;
293         source.run();
294         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msga);
295         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msgb);
296
297         assertEquals(Arrays.asList(MY_MESSAGE, msga, msgb), Arrays.asList(source.getRecentEvents()));
298     }
299
300     @Test
301     void testOffer() {
302         source.register(listener);
303         source.offer(MY_MESSAGE);
304         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
305         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(source.getRecentEvents()));
306     }
307
308     @Test
309     void testOffer_NotStarted() {
310         assertThatThrownBy(() -> source.offer(MY_MESSAGE)).isInstanceOf(IllegalStateException.class);
311     }
312
313     @Test
314     void testGetConsumerGroup() {
315         assertEquals(MY_CONS_GROUP, source.getConsumerGroup());
316     }
317
318     @Test
319     void testGetConsumerInstance() {
320         assertEquals(MY_CONS_INST, source.getConsumerInstance());
321     }
322
323     @Test
324     void testShutdown() {
325         source.register(listener);
326
327         source.shutdown();
328         verify(cons).close();
329         assertTrue(source.snapshotTopicListeners().isEmpty());
330     }
331
332     @Test
333     void testGetFetchTimeout() {
334         assertEquals(MY_FETCH_TIMEOUT, source.getFetchTimeout());
335     }
336
337     @Test
338     void testGetFetchLimit() {
339         assertEquals(MY_FETCH_LIMIT, source.getFetchLimit());
340     }
341
342     /**
343      * Implementation of SingleThreadedBusTopicSource that counts the number of times
344      * init() is invoked.
345      */
346     private class SingleThreadedBusTopicSourceImpl extends SingleThreadedBusTopicSource {
347
348         private int initCount = 0;
349         private boolean initEx = false;
350
351         public SingleThreadedBusTopicSourceImpl(BusTopicParams busTopicParams) {
352             super(busTopicParams);
353         }
354
355         @Override
356         public CommInfrastructure getTopicCommInfrastructure() {
357             return CommInfrastructure.NOOP;
358         }
359
360         @Override
361         public void init() throws MalformedURLException {
362             ++initCount;
363
364             if (initEx) {
365                 throw new MalformedURLException(EXPECTED);
366             }
367
368             consumer = cons;
369         }
370
371         @Override
372         protected Thread makePollerThread() {
373             return thread;
374         }
375
376     }
377 }