614e7689990b5837a93b93629b86640b539c6090
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.common.rules.test;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
43 import org.onap.policy.common.message.bus.event.TopicEndpoint;
44 import org.onap.policy.common.message.bus.event.TopicEndpointManager;
45 import org.onap.policy.common.message.bus.event.noop.NoopTopicSink;
46 import org.onap.policy.common.parameters.topic.TopicParameters;
47
48 class ListenerTest {
49     private static final String EXPECTED_EXCEPTION = "expected exception";
50     private static final String MY_TOPIC = "my-topic";
51     private static final String MESSAGE = "the-message";
52     private static final String MESSAGE2 = "other-message";
53     private static final String MSG_SUFFIX = "s";
54     private static final String DECODED_MESSAGE = MESSAGE + MSG_SUFFIX;
55
56     private final NoopTopicSink sink = mock(NoopTopicSink.class);
57
58     private final TopicEndpoint mgr = mock(TopicEndpoint.class);
59
60     private Listener<String> listener;
61
62     /**
63      * Creates topics.
64      */
65     @BeforeAll
66     public static void setUpBeforeClass() {
67         TopicEndpointManager.getManager().shutdown();
68
69         var params = new TopicParameters();
70         params.setTopic(MY_TOPIC);
71         params.setManaged(true);
72         params.setTopicCommInfrastructure("NOOP");
73
74         TopicEndpointManager.getManager().addTopicSinks(List.of(params));
75     }
76
77     @AfterAll
78     public static void tearDownAfterClass() {
79         TopicEndpointManager.getManager().shutdown();
80     }
81
82     /**
83      * Sets up.
84      */
85     @BeforeEach
86     public void setUp() {
87         when(mgr.getNoopTopicSink(MY_TOPIC)).thenReturn(sink);
88
89         listener = new Listener<>(MY_TOPIC, msg -> msg + MSG_SUFFIX) {
90             @Override
91             protected TopicEndpoint getTopicManager() {
92                 return mgr;
93             }
94         };
95     }
96
97     @Test
98     void testListener() {
99         verify(sink).register(listener);
100     }
101
102     @Test
103     void testAwait_testAwaitLongTimeUnit_testIsEmpty() {
104         assertTrue(listener.isEmpty());
105
106         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
107         assertFalse(listener.isEmpty());
108
109         assertEquals(DECODED_MESSAGE, listener.await());
110
111         assertTrue(listener.isEmpty());
112     }
113
114     @Test
115     void testAwaitPredicateOfT() {
116         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
117         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE2);
118         assertEquals(MESSAGE2 + MSG_SUFFIX, listener.await(msg -> msg.startsWith("other-")));
119     }
120
121     /**
122      * Tests await() when the remaining time is negative.
123      */
124     @Test
125     void testAwaitLongTimeUnitPredicateNoTime() {
126         assertThatThrownBy(() -> listener.await(-1, TimeUnit.SECONDS)).isInstanceOf(TopicException.class);
127     }
128
129     /**
130      * Tests await() when the poll() returns {@code null}.
131      */
132     @Test
133     void testAwaitLongTimeUnitPredicateNoMessage() {
134         assertThatThrownBy(() -> listener.await(1, TimeUnit.MILLISECONDS)).isInstanceOf(TopicException.class);
135     }
136
137     /**
138      * Tests await() when the poll() is interrupted.
139      */
140     @Test
141     void testAwaitLongTimeUnitPredicateInterrupted() throws InterruptedException {
142         listener = new Listener<>(MY_TOPIC, msg -> msg) {
143             @Override
144             protected String pollMessage(long remainingMs) throws InterruptedException {
145                 throw new InterruptedException(EXPECTED_EXCEPTION);
146             }
147         };
148
149         AtomicReference<TopicException> exref = new AtomicReference<>();
150         var interrupted = new CountDownLatch(1);
151
152         var thread = new Thread(() -> {
153             try {
154                 listener.await();
155             } catch (TopicException e) {
156                 exref.set(e);
157             }
158
159             if (Thread.currentThread().isInterrupted()) {
160                 interrupted.countDown();
161             }
162         });
163
164         thread.start();
165         assertTrue(interrupted.await(5, TimeUnit.SECONDS));
166         assertNotNull(exref.get());
167     }
168
169     @Test
170     void testUnregister() {
171         listener.unregister();
172         verify(sink).unregister(listener);
173     }
174
175     @Test
176     void testOnTopicEvent() {
177         listener = new Listener<>(MY_TOPIC, msg -> {
178             throw new IllegalArgumentException(EXPECTED_EXCEPTION);
179         });
180
181         // onTopicEvent() should not throw an exception
182         assertThatCode(() -> listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE))
183                         .doesNotThrowAnyException();
184
185         // should not have queued a message
186         assertTrue(listener.isEmpty());
187     }
188
189     @Test
190     void testGetTopicManager() {
191         // use a listener with a real manager
192         assertNotNull(new Listener<>(MY_TOPIC, msg -> msg).getTopicManager());
193     }
194 }