2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.message.bus.event.client;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNotSame;
29 import static org.junit.jupiter.api.Assertions.assertSame;
30 import static org.junit.jupiter.api.Assertions.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.anyString;
33 import static org.mockito.Mockito.atLeast;
34 import static org.mockito.Mockito.lenient;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
38 import java.util.Arrays;
39 import java.util.Properties;
40 import java.util.concurrent.Semaphore;
41 import java.util.concurrent.TimeUnit;
42 import lombok.AllArgsConstructor;
44 import lombok.NoArgsConstructor;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.AfterEach;
47 import org.junit.jupiter.api.BeforeAll;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.junit.jupiter.api.extension.ExtendWith;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.Mock;
53 import org.mockito.junit.jupiter.MockitoExtension;
54 import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
55 import org.onap.policy.common.message.bus.event.TopicEndpoint;
56 import org.onap.policy.common.message.bus.event.TopicEndpointManager;
57 import org.onap.policy.common.message.bus.event.TopicListener;
58 import org.onap.policy.common.message.bus.event.TopicSink;
59 import org.onap.policy.common.message.bus.event.TopicSource;
60 import org.onap.policy.common.utils.coder.Coder;
61 import org.onap.policy.common.utils.coder.CoderException;
62 import org.onap.policy.common.utils.coder.StandardCoder;
64 @ExtendWith(MockitoExtension.class)
65 class BidirectionalTopicClientTest {
66 private static final Coder coder = new StandardCoder();
67 private static final long MAX_WAIT_MS = 5000;
68 private static final long SHORT_WAIT_MS = 1;
69 private static final String SINK_TOPIC = "my-sink-topic";
70 private static final String SOURCE_TOPIC = "my-source-topic";
71 private static final String MY_TEXT = "my-text";
73 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
74 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
77 private TopicSink sink;
79 private TopicSource source;
81 private TopicEndpoint endpoint;
83 private TopicListener listener;
85 private MyMessage theMessage;
87 private BidirectionalTopicClient client;
88 private Context context;
91 * Configures the endpoints.
94 public static void setUpBeforeClass() {
95 Properties props = new Properties();
96 props.setProperty("noop.sink.topics", SINK_TOPIC);
97 props.setProperty("noop.source.topics", SOURCE_TOPIC);
99 // clear all topics and then configure one sink and one source
100 TopicEndpointManager.getManager().shutdown();
101 TopicEndpointManager.getManager().addTopicSinks(props);
102 TopicEndpointManager.getManager().addTopicSources(props);
106 public static void tearDownAfterClass() {
107 // clear all topics after the tests
108 TopicEndpointManager.getManager().shutdown();
112 * Creates mocks and an initial client object.
115 public void setUp() throws Exception {
116 lenient().when(sink.send(anyString())).thenReturn(true);
117 lenient().when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
119 lenient().when(source.offer(anyString())).thenReturn(true);
120 lenient().when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
122 lenient().when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
123 lenient().when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
125 lenient().when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
126 lenient().when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
128 theMessage = new MyMessage(MY_TEXT);
130 client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
132 context = new Context();
136 public void tearDown() {
141 void testBidirectionalTopicClient_testGetters() {
142 assertSame(sink, client.getSink());
143 assertSame(source, client.getSource());
144 assertEquals(SINK_TOPIC, client.getSinkTopic());
145 assertEquals(SOURCE_TOPIC, client.getSourceTopic());
146 assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
147 assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
151 * Tests the constructor when the sink or source cannot be found.
154 void testBidirectionalTopicClientExceptions() {
155 assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
156 .isInstanceOf(BidirectionalTopicClientException.class)
157 .hasMessage("no sinks for topic: unknown-sink");
159 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
160 .isInstanceOf(BidirectionalTopicClientException.class)
161 .hasMessage("no sources for topic: unknown-source");
164 when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
166 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
167 .isInstanceOf(BidirectionalTopicClientException.class)
168 .hasMessage("too many sources for topic: my-source-topic");
172 * Tests the "delegate" methods.
175 void testDelegates() {
176 assertTrue(client.send("hello"));
177 verify(sink).send("hello");
179 assertTrue(client.offer("incoming"));
180 verify(source).offer("incoming");
182 client.register(listener);
183 verify(source).register(listener);
185 client.unregister(listener);
186 verify(source).unregister(listener);
190 void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
191 // use a real manager
192 client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
193 assertNotNull(client.getTopicEndpointManager());
195 assertNotNull(client.getSink());
196 assertNotNull(client.getSource());
198 assertNotSame(sink, client.getSink());
199 assertNotSame(source, client.getSource());
203 void testAwaitReceipt() throws Exception {
204 context.start(theMessage);
205 assertThat(context.awaitSend(1)).isTrue();
207 verify(source).register(any());
208 verify(sink, atLeast(1)).send(any());
209 assertThat(context.checker.isReady()).isFalse();
217 void testAwaitReceipt_AlreadyDone() throws Exception {
218 context.start(theMessage);
219 assertThat(context.awaitSend(1)).isTrue();
225 // calling again should result in "true" again, without injecting message
226 context.start(theMessage);
231 void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
232 context.start(theMessage);
233 assertThat(context.awaitSend(1)).isTrue();
235 // non-matching message
238 // wait for a few more calls to "send" and then inject a matching message
239 assertThat(context.awaitSend(3)).isTrue();
246 void testAwaitReceipt_DecodeFails() throws Exception {
247 context.start(theMessage);
248 assertThat(context.awaitSend(1)).isTrue();
250 // force a failure and inject the message
251 context.forceDecodeFailure = true;
254 assertThat(context.awaitDecodeFailure()).isTrue();
257 context.forceDecodeFailure = false;
264 void testAwaitReceipt_Interrupted() throws InterruptedException {
265 context.start(theMessage);
266 assertThat(context.awaitSend(1)).isTrue();
274 void testAwaitReceipt_MultipleLoops() throws Exception {
275 context.start(theMessage);
277 // wait for multiple "send" calls
278 assertThat(context.awaitSend(3)).isTrue();
286 void testStop() throws InterruptedException {
287 context.start(theMessage);
288 assertThat(context.awaitSend(1)).isTrue();
296 * Verifies that awaitReceipt() returns {@code true}.
298 * @throws InterruptedException if interrupted while waiting for the thread to
301 private void verifyReceipt() throws InterruptedException {
302 assertThat(context.join()).isTrue();
303 assertThat(context.result).isTrue();
304 assertThat(context.exception).isNull();
305 assertThat(context.checker.isReady()).isTrue();
307 verify(source).unregister(any());
311 * Verifies that awaitReceipt() returns {@code false}.
313 * @throws InterruptedException if interrupted while waiting for the thread to
316 private void verifyNoReceipt() throws InterruptedException {
317 assertThat(context.join()).isTrue();
318 assertThat(context.result).isFalse();
319 assertThat(context.exception).isNull();
320 assertThat(context.checker.isReady()).isFalse();
322 verify(source).unregister(any());
326 * Injects a message into the source topic.
328 * @param message message to be injected
329 * @throws CoderException if the message cannot be encoded
331 private void inject(MyMessage message) throws CoderException {
332 inject(coder.encode(message));
336 * Injects a message into the source topic.
338 * @param message message to be injected
340 private void inject(String message) {
341 ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
342 verify(source).register(cap.capture());
344 cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
349 * BidirectionalTopicClient with some overrides.
351 private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
353 public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
354 throws BidirectionalTopicClientException {
355 super(sinkTopic, sourceTopic);
359 protected TopicEndpoint getTopicEndpointManager() {
364 private class Context {
365 private Thread thread;
366 private boolean result;
367 private Exception exception;
368 private boolean forceDecodeFailure;
370 // released every time the checker publishes a message
371 private final Semaphore sendSem = new Semaphore(0);
373 // released every time a message-decode fails
374 private final Semaphore decodeFailedSem = new Semaphore(0);
376 private final BidirectionalTopicClient2 checker;
378 public Context() throws BidirectionalTopicClientException {
380 checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
383 public boolean send(String messageText) {
384 boolean result = super.send(messageText);
390 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
391 if (forceDecodeFailure) {
392 throw new CoderException("expected exception");
395 return super.decode(msg, clazz);
399 protected void decodeFailed() {
400 super.decodeFailed();
401 decodeFailedSem.release();
409 * @param message message to be sent to the sink topic
411 public void start(MyMessage message) {
412 thread = new Thread() {
416 result = checker.awaitReady(message, SHORT_WAIT_MS);
417 } catch (Exception e) {
422 thread.setDaemon(true);
427 checker.stopWaiting();
430 public boolean join() throws InterruptedException {
431 thread.join(MAX_WAIT_MS);
432 return !thread.isAlive();
435 public void interrupt() {
439 public boolean awaitSend(int npermits) throws InterruptedException {
440 return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
443 public boolean awaitDecodeFailure() throws InterruptedException {
444 return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
451 public static class MyMessage {