2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.client;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNotSame;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.anyString;
33 import static org.mockito.Mockito.atLeast;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.util.Arrays;
38 import java.util.Properties;
39 import java.util.concurrent.Semaphore;
40 import java.util.concurrent.TimeUnit;
41 import lombok.AllArgsConstructor;
43 import lombok.NoArgsConstructor;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
55 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
56 import org.onap.policy.common.endpoints.event.comm.TopicListener;
57 import org.onap.policy.common.endpoints.event.comm.TopicSink;
58 import org.onap.policy.common.endpoints.event.comm.TopicSource;
59 import org.onap.policy.common.utils.coder.Coder;
60 import org.onap.policy.common.utils.coder.CoderException;
61 import org.onap.policy.common.utils.coder.StandardCoder;
63 @RunWith(MockitoJUnitRunner.class)
64 public class BidirectionalTopicClientTest {
65 private static final Coder coder = new StandardCoder();
66 private static final long MAX_WAIT_MS = 5000;
67 private static final long SHORT_WAIT_MS = 1;
68 private static final String SINK_TOPIC = "my-sink-topic";
69 private static final String SOURCE_TOPIC = "my-source-topic";
70 private static final String MY_TEXT = "my-text";
72 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
73 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
76 private TopicSink sink;
78 private TopicSource source;
80 private TopicEndpoint endpoint;
82 private TopicListener listener;
84 private MyMessage theMessage;
86 private BidirectionalTopicClient client;
87 private Context context;
90 * Configures the endpoints.
93 public static void setUpBeforeClass() {
94 Properties props = new Properties();
95 props.setProperty("noop.sink.topics", SINK_TOPIC);
96 props.setProperty("noop.source.topics", SOURCE_TOPIC);
98 // clear all topics and then configure one sink and one source
99 TopicEndpointManager.getManager().shutdown();
100 TopicEndpointManager.getManager().addTopicSinks(props);
101 TopicEndpointManager.getManager().addTopicSources(props);
105 public static void tearDownAfterClass() {
106 // clear all topics after the tests
107 TopicEndpointManager.getManager().shutdown();
111 * Creates mocks and an initial client object.
114 public void setUp() throws Exception {
115 when(sink.send(anyString())).thenReturn(true);
116 when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
118 when(source.offer(anyString())).thenReturn(true);
119 when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
121 when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
122 when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
124 when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
125 when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
127 theMessage = new MyMessage(MY_TEXT);
129 client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
131 context = new Context();
135 public void tearDown() {
140 public void testBidirectionalTopicClient_testGetters() {
141 assertSame(sink, client.getSink());
142 assertSame(source, client.getSource());
143 assertEquals(SINK_TOPIC, client.getSinkTopic());
144 assertEquals(SOURCE_TOPIC, client.getSourceTopic());
145 assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
146 assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
150 * Tests the constructor when the sink or source cannot be found.
153 public void testBidirectionalTopicClientExceptions() {
154 assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
155 .isInstanceOf(BidirectionalTopicClientException.class)
156 .hasMessage("no sinks for topic: unknown-sink");
158 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
159 .isInstanceOf(BidirectionalTopicClientException.class)
160 .hasMessage("no sources for topic: unknown-source");
163 when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
165 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
166 .isInstanceOf(BidirectionalTopicClientException.class)
167 .hasMessage("too many sources for topic: my-source-topic");
171 * Tests the "delegate" methods.
174 public void testDelegates() {
175 assertTrue(client.send("hello"));
176 verify(sink).send("hello");
178 assertTrue(client.offer("incoming"));
179 verify(source).offer("incoming");
181 client.register(listener);
182 verify(source).register(listener);
184 client.unregister(listener);
185 verify(source).unregister(listener);
189 public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
190 // use a real manager
191 client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
192 assertNotNull(client.getTopicEndpointManager());
194 assertNotNull(client.getSink());
195 assertNotNull(client.getSource());
197 assertNotSame(sink, client.getSink());
198 assertNotSame(source, client.getSource());
202 public void testAwaitReceipt() throws Exception {
203 context.start(theMessage);
204 assertThat(context.awaitSend(1)).isTrue();
206 verify(source).register(any());
207 verify(sink, atLeast(1)).send(any());
208 assertThat(context.checker.isReady()).isFalse();
216 public void testAwaitReceipt_AlreadyDone() throws Exception {
217 context.start(theMessage);
218 assertThat(context.awaitSend(1)).isTrue();
224 // calling again should result in "true" again, without injecting message
225 context.start(theMessage);
230 public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
231 context.start(theMessage);
232 assertThat(context.awaitSend(1)).isTrue();
234 // non-matching message
237 // wait for a few more calls to "send" and then inject a matching message
238 assertThat(context.awaitSend(3)).isTrue();
245 public void testAwaitReceipt_DecodeFails() throws Exception {
246 context.start(theMessage);
247 assertThat(context.awaitSend(1)).isTrue();
249 // force a failure and inject the message
250 context.forceDecodeFailure = true;
253 assertThat(context.awaitDecodeFailure()).isTrue();
256 context.forceDecodeFailure = false;
263 public void testAwaitReceipt_Interrupted() throws InterruptedException {
264 context.start(theMessage);
265 assertThat(context.awaitSend(1)).isTrue();
273 public void testAwaitReceipt_MultipleLoops() throws Exception {
274 context.start(theMessage);
276 // wait for multiple "send" calls
277 assertThat(context.awaitSend(3)).isTrue();
285 public void testStop() throws InterruptedException {
286 context.start(theMessage);
287 assertThat(context.awaitSend(1)).isTrue();
295 * Verifies that awaitReceipt() returns {@code true}.
297 * @throws InterruptedException if interrupted while waiting for the thread to
300 private void verifyReceipt() throws InterruptedException {
301 assertThat(context.join()).isTrue();
302 assertThat(context.result).isTrue();
303 assertThat(context.exception).isNull();
304 assertThat(context.checker.isReady()).isTrue();
306 verify(source).unregister(any());
310 * Verifies that awaitReceipt() returns {@code false}.
312 * @throws InterruptedException if interrupted while waiting for the thread to
315 private void verifyNoReceipt() throws InterruptedException {
316 assertThat(context.join()).isTrue();
317 assertThat(context.result).isFalse();
318 assertThat(context.exception).isNull();
319 assertThat(context.checker.isReady()).isFalse();
321 verify(source).unregister(any());
325 * Injects a message into the source topic.
327 * @param message message to be injected
328 * @throws CoderException if the message cannot be encoded
330 private void inject(MyMessage message) throws CoderException {
331 inject(coder.encode(message));
335 * Injects a message into the source topic.
337 * @param message message to be injected
339 private void inject(String message) {
340 ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
341 verify(source).register(cap.capture());
343 cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
348 * BidirectionalTopicClient with some overrides.
350 private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
352 public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
353 throws BidirectionalTopicClientException {
354 super(sinkTopic, sourceTopic);
358 protected TopicEndpoint getTopicEndpointManager() {
363 private class Context {
364 private Thread thread;
365 private boolean result;
366 private Exception exception;
367 private boolean forceDecodeFailure;
369 // released every time the checker publishes a message
370 private final Semaphore sendSem = new Semaphore(0);
372 // released every time a message-decode fails
373 private final Semaphore decodeFailedSem = new Semaphore(0);
375 private final BidirectionalTopicClient2 checker;
377 public Context() throws BidirectionalTopicClientException {
379 checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
382 public boolean send(String messageText) {
383 boolean result = super.send(messageText);
389 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
390 if (forceDecodeFailure) {
391 throw new CoderException("expected exception");
394 return super.decode(msg, clazz);
398 protected void decodeFailed() {
399 super.decodeFailed();
400 decodeFailedSem.release();
408 * @param message message to be sent to the sink topic
410 public void start(MyMessage message) {
411 thread = new Thread() {
415 result = checker.awaitReady(message, SHORT_WAIT_MS);
416 } catch (Exception e) {
421 thread.setDaemon(true);
426 checker.stopWaiting();
429 public boolean join() throws InterruptedException {
430 thread.join(MAX_WAIT_MS);
431 return !thread.isAlive();
434 public void interrupt() {
438 public boolean awaitSend(int npermits) throws InterruptedException {
439 return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
442 public boolean awaitDecodeFailure() throws InterruptedException {
443 return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
450 public static class MyMessage {