2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertThrows;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.Mockito.doThrow;
32 import static org.mockito.Mockito.mock;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import org.junit.jupiter.api.BeforeEach;
42 import org.junit.jupiter.api.Test;
43 import org.onap.policy.common.endpoints.event.comm.TopicListener;
44 import org.onap.policy.common.endpoints.event.comm.TopicSink;
45 import org.onap.policy.common.endpoints.event.comm.TopicSource;
47 class TopicMessageManagerTest {
49 private static final String EXPECTED = "expected";
50 private static final String MY_TOPIC = "my.topic";
51 private static final String MSG = "a message";
53 private TopicListener listener;
54 private TopicSource source;
55 private boolean gotSources;
56 private TopicSink sink;
57 private boolean gotSinks;
58 private TopicMessageManager mgr;
63 * @throws Exception throws an exception
66 public void setUp() throws Exception {
67 listener = mock(TopicListener.class);
68 source = mock(TopicSource.class);
70 sink = mock(TopicSink.class);
73 when(source.getTopic()).thenReturn(MY_TOPIC);
75 when(sink.getTopic()).thenReturn(MY_TOPIC);
76 when(sink.send(any())).thenReturn(true);
78 mgr = new TopicMessageManagerImpl(MY_TOPIC);
82 void testTopicMessageManager() {
83 // verify that the init methods were called
84 assertTrue(gotSources);
89 void testTopicMessageManager_PoolingEx() {
90 // force error by having no topics match
91 when(source.getTopic()).thenReturn("");
93 assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC));
97 void testTopicMessageManager_IllegalArgEx() {
99 assertThrows(PoolingFeatureException.class, () ->
100 new TopicMessageManagerImpl(MY_TOPIC) {
102 protected List<TopicSource> getTopicSources() {
103 throw new IllegalArgumentException(EXPECTED);
109 void testGetTopic() {
110 assertEquals(MY_TOPIC, mgr.getTopic());
114 void testFindTopicSource_NotFound() {
115 // one item in list, and its topic doesn't match
116 assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
118 protected List<TopicSource> getTopicSources() {
119 return Collections.singletonList(mock(TopicSource.class));
125 void testFindTopicSource_EmptyList() {
127 assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
129 protected List<TopicSource> getTopicSources() {
130 return Collections.emptyList();
136 void testFindTopicSink_NotFound() {
137 // one item in list, and its topic doesn't match
138 assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
140 protected List<TopicSink> getTopicSinks() {
141 return Collections.singletonList(mock(TopicSink.class));
147 void testFindTopicSink_EmptyList() {
149 assertThrows(PoolingFeatureException.class, () -> new TopicMessageManagerImpl(MY_TOPIC) {
151 protected List<TopicSink> getTopicSinks() {
152 return Collections.emptyList();
158 void testStartPublisher() throws PoolingFeatureException {
160 mgr.startPublisher();
162 // restart should have no effect
163 mgr.startPublisher();
165 // should be able to publish now
167 verify(sink).send(MSG);
171 void testStopPublisher() {
172 // not publishing yet, so stopping should have no effect
173 mgr.stopPublisher(0);
176 mgr.startPublisher();
178 // this time, stop should do something
179 mgr.stopPublisher(0);
181 // re-stopping should have no effect
182 assertThatCode(() -> mgr.stopPublisher(0)).doesNotThrowAnyException();
186 void testStopPublisher_WithDelay() {
188 mgr.startPublisher();
190 long tbeg = System.currentTimeMillis();
192 mgr.stopPublisher(100L);
194 assertTrue(System.currentTimeMillis() >= tbeg + 100L);
198 void testStopPublisher_WithDelayInterrupted() throws Exception {
200 mgr.startPublisher();
204 // tell the publisher to stop in minms + additional time
205 CountDownLatch latch = new CountDownLatch(1);
206 Thread thread = new Thread(() -> {
208 mgr.stopPublisher(minms + 3000L);
212 // wait for the thread to start
215 // interrupt it - it should immediately finish its work
218 // wait for it to stop, but only wait the minimum time
221 assertFalse(thread.isAlive());
225 void testStartConsumer() {
227 verify(source, never()).register(any());
229 mgr.startConsumer(listener);
230 verify(source).register(listener);
232 // restart should have no effect
233 mgr.startConsumer(listener);
234 verify(source).register(listener);
238 void testStopConsumer() {
239 // not consuming yet, so stopping should have no effect
240 mgr.stopConsumer(listener);
241 verify(source, never()).unregister(any());
244 mgr.startConsumer(listener);
246 // this time, stop should do something
247 mgr.stopConsumer(listener);
248 verify(source).unregister(listener);
250 // re-stopping should have no effect
251 mgr.stopConsumer(listener);
252 verify(source).unregister(listener);
256 void testPublish() throws PoolingFeatureException {
257 // cannot publish before starting
258 assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,pre").isInstanceOf(PoolingFeatureException.class);
260 mgr.startPublisher();
262 // publish several messages
264 verify(sink).send(MSG);
266 mgr.publish(MSG + "a");
267 verify(sink).send(MSG + "a");
269 mgr.publish(MSG + "b");
270 verify(sink).send(MSG + "b");
272 // stop and verify we can no longer publish
273 mgr.stopPublisher(0);
274 assertThatThrownBy(() -> mgr.publish(MSG)).as("publish,stopped").isInstanceOf(PoolingFeatureException.class);
278 void testPublish_SendFailed() {
279 mgr.startPublisher();
281 // arrange for send() to fail
282 when(sink.send(MSG)).thenReturn(false);
284 assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
288 void testPublish_SendEx() {
289 mgr.startPublisher();
291 // arrange for send() to throw an exception
292 doThrow(new IllegalStateException(EXPECTED)).when(sink).send(MSG);
294 assertThrows(PoolingFeatureException.class, () -> mgr.publish(MSG));
298 * Manager with overrides.
300 private class TopicMessageManagerImpl extends TopicMessageManager {
302 public TopicMessageManagerImpl(String topic) throws PoolingFeatureException {
307 protected List<TopicSource> getTopicSources() {
310 // three sources, with the desired one in the middle
311 return Arrays.asList(mock(TopicSource.class), source, mock(TopicSource.class));
315 protected List<TopicSink> getTopicSinks() {
318 // three sinks, with the desired one in the middle
319 return Arrays.asList(mock(TopicSink.class), sink, mock(TopicSink.class));