2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.common.rules.test;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertTrue;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
34 import java.util.List;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicReference;
38 import org.junit.jupiter.api.AfterAll;
39 import org.junit.jupiter.api.BeforeAll;
40 import org.junit.jupiter.api.BeforeEach;
41 import org.junit.jupiter.api.Test;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
44 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
45 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
46 import org.onap.policy.common.endpoints.parameters.TopicParameters;
49 private static final String EXPECTED_EXCEPTION = "expected exception";
50 private static final String MY_TOPIC = "my-topic";
51 private static final String MESSAGE = "the-message";
52 private static final String MESSAGE2 = "other-message";
53 private static final String MSG_SUFFIX = "s";
54 private static final String DECODED_MESSAGE = MESSAGE + MSG_SUFFIX;
56 private final NoopTopicSink sink = mock(NoopTopicSink.class);
58 private final TopicEndpoint mgr = mock(TopicEndpoint.class);
60 private Listener<String> listener;
66 public static void setUpBeforeClass() {
67 TopicEndpointManager.getManager().shutdown();
69 var params = new TopicParameters();
70 params.setTopic(MY_TOPIC);
71 params.setManaged(true);
72 params.setTopicCommInfrastructure("NOOP");
74 TopicEndpointManager.getManager().addTopicSinks(List.of(params));
78 public static void tearDownAfterClass() {
79 TopicEndpointManager.getManager().shutdown();
87 when(mgr.getNoopTopicSink(MY_TOPIC)).thenReturn(sink);
89 listener = new Listener<>(MY_TOPIC, msg -> msg + MSG_SUFFIX) {
91 protected TopicEndpoint getTopicManager() {
99 verify(sink).register(listener);
103 void testAwait_testAwaitLongTimeUnit_testIsEmpty() {
104 assertTrue(listener.isEmpty());
106 listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
107 assertFalse(listener.isEmpty());
109 assertEquals(DECODED_MESSAGE, listener.await());
111 assertTrue(listener.isEmpty());
115 void testAwaitPredicateOfT() {
116 listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE);
117 listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE2);
118 assertEquals(MESSAGE2 + MSG_SUFFIX, listener.await(msg -> msg.startsWith("other-")));
122 * Tests await() when the remaining time is negative.
125 void testAwaitLongTimeUnitPredicateNoTime() {
126 assertThatThrownBy(() -> listener.await(-1, TimeUnit.SECONDS)).isInstanceOf(TopicException.class);
130 * Tests await() when the poll() returns {@code null}.
133 void testAwaitLongTimeUnitPredicateNoMessage() {
134 assertThatThrownBy(() -> listener.await(1, TimeUnit.MILLISECONDS)).isInstanceOf(TopicException.class);
138 * Tests await() when the poll() is interrupted.
141 void testAwaitLongTimeUnitPredicateInterrupted() throws InterruptedException {
142 listener = new Listener<String>(MY_TOPIC, msg -> msg) {
144 protected String pollMessage(long remainingMs) throws InterruptedException {
145 throw new InterruptedException(EXPECTED_EXCEPTION);
149 AtomicReference<TopicException> exref = new AtomicReference<>();
150 var interrupted = new CountDownLatch(1);
152 var thread = new Thread() {
157 } catch (TopicException e) {
161 if (Thread.currentThread().isInterrupted()) {
162 interrupted.countDown();
168 assertTrue(interrupted.await(5, TimeUnit.SECONDS));
169 assertNotNull(exref.get());
173 void testUnregister() {
174 listener.unregister();
175 verify(sink).unregister(listener);
179 void testOnTopicEvent() {
180 listener = new Listener<>(MY_TOPIC, msg -> {
181 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
184 // onTopicEvent() should not throw an exception
185 assertThatCode(() -> listener.onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MESSAGE))
186 .doesNotThrowAnyException();
188 // should not have queued a message
189 assertTrue(listener.isEmpty());
193 void testGetTopicManager() {
194 // use a listener with a real manager
195 assertNotNull(new Listener<>(MY_TOPIC, msg -> msg).getTopicManager());