Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / test / java / org / onap / policy / drools / pooling / TopicMessageManagerTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertThrows;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.Mockito.doThrow;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.onap.policy.common.endpoints.event.comm.TopicListener;
44 import org.onap.policy.common.endpoints.event.comm.TopicSink;
45 import org.onap.policy.common.endpoints.event.comm.TopicSource;
46
47 class TopicMessageManagerTest {
48
49     private static final String EXPECTED = "expected";
50     private static final String MY_TOPIC = "my.topic";
51     private static final String MSG = "a message";
52
53     private TopicListener listener;
54     private TopicSource source;
55     private boolean gotSources;
56     private TopicSink sink;
57     private boolean gotSinks;
58     private TopicMessageManager mgr;
59
60     /**
61      * Setup.
62      *
63      * @throws Exception throws an exception
64      */
65     @BeforeEach
66     public void setUp() throws Exception {
67         listener = mock(TopicListener.class);
68         source = mock(TopicSource.class);
69         gotSources = false;
70         sink = mock(TopicSink.class);
71         gotSinks = false;
72
73         when(source.getTopic()).thenReturn(MY_TOPIC);
74
75         when(sink.getTopic()).thenReturn(MY_TOPIC);
76         when(sink.send(any())).thenReturn(true);
77
78         mgr = new TopicMessageManagerImpl(MY_TOPIC);
79     }
80
81     @Test
82     void testTopicMessageManager() {
83         // verify that the init methods were called
84         assertTrue(gotSources);
85         assertTrue(gotSinks);
86     }
87
88     @Test
89     void testTopicMessageManager_PoolingEx() {
90         // force error by having no topics match
91         when(source.getTopic()).thenReturn("");
92
93         assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC));
94     }
95
96     @Test
97     void testTopicMessageManager_IllegalArgEx() {
98         // force error
99         assertThrows(PoolingFeatureException.class, () ->
100             new TopicMessageManagerImpl(MY_TOPIC) {
101             @Override
102             protected List<TopicSource> getTopicSources() {
103                 throw new IllegalArgumentException(EXPECTED);
104             }
105         });
106     }
107
108     @Test
109     void testGetTopic() {
110         assertEquals(MY_TOPIC, mgr.getTopic());
111     }
112
113     @Test
114     void testFindTopicSource_NotFound() {
115         // one item in list, and its topic doesn't match
116         assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
117             @Override
118             protected List<TopicSource> getTopicSources() {
119                 return Collections.singletonList(mock(TopicSource.class));
120             }
121         });
122     }
123
124     @Test
125     void testFindTopicSource_EmptyList() {
126         // empty list
127         assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
128             @Override
129             protected List<TopicSource> getTopicSources() {
130                 return Collections.emptyList();
131             }
132         });
133     }
134
135     @Test
136     void testFindTopicSink_NotFound() {
137         // one item in list, and its topic doesn't match
138         assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
139             @Override
140             protected List<TopicSink> getTopicSinks() {
141                 return Collections.singletonList(mock(TopicSink.class));
142             }
143         });
144     }
145
146     @Test
147     void testFindTopicSink_EmptyList() {
148         // empty list
149         assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
150             @Override
151             protected List<TopicSink> getTopicSinks() {
152                 return Collections.emptyList();
153             }
154         });
155     }
156
157     @Test
158     void testStartPublisher() throws PoolingFeatureException {
159
160         mgr.startPublisher();
161
162         // restart should have no effect
163         mgr.startPublisher();
164
165         // should be able to publish now
166         mgr.publish(MSG);
167         verify(sink).send(MSG);
168     }
169
170     @Test
171     void testStopPublisher() {
172         // not publishing yet, so stopping should have no effect
173         mgr.stopPublisher(0);
174
175         // now start it
176         mgr.startPublisher();
177
178         // this time, stop should do something
179         mgr.stopPublisher(0);
180
181         // re-stopping should have no effect
182         assertThatCode(() -> mgr.stopPublisher(0)).doesNotThrowAnyException();
183     }
184
185     @Test
186     void testStopPublisher_WithDelay() {
187
188         mgr.startPublisher();
189
190         long tbeg = System.currentTimeMillis();
191
192         mgr.stopPublisher(100L);
193
194         assertTrue(System.currentTimeMillis() >= tbeg + 100L);
195     }
196
197     @Test
198     void testStopPublisher_WithDelayInterrupted() throws Exception {
199
200         mgr.startPublisher();
201
202         long minms = 2000L;
203
204         // tell the publisher to stop in minms + additional time
205         CountDownLatch latch = new CountDownLatch(1);
206         Thread thread = new Thread(() -> {
207             latch.countDown();
208             mgr.stopPublisher(minms + 3000L);
209         });
210         thread.start();
211
212         // wait for the thread to start
213         latch.await();
214
215         // interrupt it - it should immediately finish its work
216         thread.interrupt();
217
218         // wait for it to stop, but only wait the minimum time
219         thread.join(minms);
220
221         assertFalse(thread.isAlive());
222     }
223
224     @Test
225     void testStartConsumer() {
226         // not started yet
227         verify(source, never()).register(any());
228
229         mgr.startConsumer(listener);
230         verify(source).register(listener);
231
232         // restart should have no effect
233         mgr.startConsumer(listener);
234         verify(source).register(listener);
235     }
236
237     @Test
238     void testStopConsumer() {
239         // not consuming yet, so stopping should have no effect
240         mgr.stopConsumer(listener);
241         verify(source, never()).unregister(any());
242
243         // now start it
244         mgr.startConsumer(listener);
245
246         // this time, stop should do something
247         mgr.stopConsumer(listener);
248         verify(source).unregister(listener);
249
250         // re-stopping should have no effect
251         mgr.stopConsumer(listener);
252         verify(source).unregister(listener);
253     }
254
255     @Test
256     void testPublish() throws PoolingFeatureException {
257         // cannot publish before starting
258         assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
259
260         mgr.startPublisher();
261
262         // publish several messages
263         mgr.publish(MSG);
264         verify(sink).send(MSG);
265
266         mgr.publish(MSG + "a");
267         verify(sink).send(MSG + "a");
268
269         mgr.publish(MSG + "b");
270         verify(sink).send(MSG + "b");
271
272         // stop and verify we can no longer publish
273         mgr.stopPublisher(0);
274         assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,stopped").isInstanceOf(PoolingFeatureException.class);
275     }
276
277     @Test
278     void testPublish_SendFailed() {
279         mgr.startPublisher();
280
281         // arrange for send() to fail
282         when(sink.send(MSG)).thenReturn(false);
283
284         assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
285     }
286
287     @Test
288     void testPublish_SendEx() {
289         mgr.startPublisher();
290
291         // arrange for send() to throw an exception
292         doThrow(new IllegalStateException(EXPECTED)).when(sink).send(MSG);
293
294         assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
295     }
296
297     /**
298      * Manager with overrides.
299      */
300     private class TopicMessageManagerImpl extends TopicMessageManager {
301
302         public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
303             super(topic);
304         }
305
306         @Override
307         protected List<TopicSource> getTopicSources() {
308             gotSources = true;
309
310             // three sources, with the desired one in the middle
311             return Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class));
312         }
313
314         @Override
315         protected List<TopicSink> getTopicSinks() {
316             gotSinks = true;
317
318             // three sinks, with the desired one in the middle
319             return Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class));
320         }
321     }
322 }