Upgrade Java 17 in policy-drools-apps
[policy/drools-applications.git] / controlloop / common / rules-test / src / test / java / org / onap / policy / controlloop / common / rules / test / ListenerTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.common.rules.test;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
44 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
45 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
46 import org.onap.policy.common.endpoints.parameters.TopicParameters;
47
48 class ListenerTest {
49     private static final String EXPECTED_EXCEPTION = "expected exception";
50     private static final String MY_TOPIC = "my-topic";
51     private static final String MESSAGE = "the-message";
52     private static final String MESSAGE2 = "other-message";
53     private static final String MSG_SUFFIX = "s";
54     private static final String DECODED_MESSAGE = MESSAGE + MSG_SUFFIX;
55
56     private final NoopTopicSink sink = mock(NoopTopicSink.class);
57
58     private final TopicEndpoint mgr = mock(TopicEndpoint.class);
59
60     private Listener<String> listener;
61
62     /**
63      * Creates topics.
64      */
65     @BeforeAll
66     public static void setUpBeforeClass() {
67         TopicEndpointManager.getManager().shutdown();
68
69         var params = new TopicParameters();
70         params.setTopic(MY_TOPIC);
71         params.setManaged(true);
72         params.setTopicCommInfrastructure("NOOP");
73
74         TopicEndpointManager.getManager().addTopicSinks(List.of(params));
75     }
76
77     @AfterAll
78     public static void tearDownAfterClass() {
79         TopicEndpointManager.getManager().shutdown();
80     }
81
82     /**
83      * Sets up.
84      */
85     @BeforeEach
86     public void setUp() {
87         when(mgr.getNoopTopicSink(MY_TOPIC)).thenReturn(sink);
88
89         listener = new Listener<>(MY_TOPIC, msg -> msg + MSG_SUFFIX) {
90             @Override
91             protected TopicEndpoint getTopicManager() {
92                 return mgr;
93             }
94         };
95     }
96
97     @Test
98     void testListener() {
99         verify(sink).register(listener);
100     }
101
102     @Test
103     void testAwait_testAwaitLongTimeUnit_testIsEmpty() {
104         assertTrue(listener.isEmpty());
105
106         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
107         assertFalse(listener.isEmpty());
108
109         assertEquals(DECODED_MESSAGE, listener.await());
110
111         assertTrue(listener.isEmpty());
112     }
113
114     @Test
115     void testAwaitPredicateOfT() {
116         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
117         listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE2);
118         assertEquals(MESSAGE2 + MSG_SUFFIX, listener.await(msg -> msg.startsWith("other-")));
119     }
120
121     /**
122      * Tests await() when the remaining time is negative.
123      */
124     @Test
125     void testAwaitLongTimeUnitPredicateNoTime() {
126         assertThatThrownBy(() -> listener.await(-1, TimeUnit.SECONDS)).isInstanceOf(TopicException.class);
127     }
128
129     /**
130      * Tests await() when the poll() returns {@code null}.
131      */
132     @Test
133     void testAwaitLongTimeUnitPredicateNoMessage() {
134         assertThatThrownBy(() -> listener.await(1, TimeUnit.MILLISECONDS)).isInstanceOf(TopicException.class);
135     }
136
137     /**
138      * Tests await() when the poll() is interrupted.
139      */
140     @Test
141     void testAwaitLongTimeUnitPredicateInterrupted() throws InterruptedException {
142         listener = new Listener<String>(MY_TOPIC, msg -> msg) {
143             @Override
144             protected String pollMessage(long remainingMs) throws InterruptedException {
145                 throw new InterruptedException(EXPECTED_EXCEPTION);
146             }
147         };
148
149         AtomicReference<TopicException> exref = new AtomicReference<>();
150         var interrupted = new CountDownLatch(1);
151
152         var thread = new Thread() {
153             @Override
154             public void run() {
155                 try {
156                     listener.await();
157                 } catch (TopicException e) {
158                     exref.set(e);
159                 }
160
161                 if (Thread.currentThread().isInterrupted()) {
162                     interrupted.countDown();
163                 }
164             }
165         };
166
167         thread.start();
168         assertTrue(interrupted.await(5, TimeUnit.SECONDS));
169         assertNotNull(exref.get());
170     }
171
172     @Test
173     void testUnregister() {
174         listener.unregister();
175         verify(sink).unregister(listener);
176     }
177
178     @Test
179     void testOnTopicEvent() {
180         listener = new Listener<>(MY_TOPIC, msg -> {
181             throw new IllegalArgumentException(EXPECTED_EXCEPTION);
182         });
183
184         // onTopicEvent() should not throw an exception
185         assertThatCode(() -> listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE))
186                         .doesNotThrowAnyException();
187
188         // should not have queued a message
189         assertTrue(listener.isEmpty());
190     }
191
192     @Test
193     void testGetTopicManager() {
194         // use a listener with a real manager
195         assertNotNull(new Listener<>(MY_TOPIC, msg -> msg).getTopicManager());
196     }
197 }