2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.client;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNotSame;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.ArgumentMatchers.anyString;
32 import static org.mockito.Mockito.atLeast;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
36 import java.util.Arrays;
37 import java.util.Properties;
38 import java.util.concurrent.Semaphore;
39 import java.util.concurrent.TimeUnit;
40 import lombok.AllArgsConstructor;
42 import lombok.NoArgsConstructor;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
55 import org.onap.policy.common.endpoints.event.comm.TopicListener;
56 import org.onap.policy.common.endpoints.event.comm.TopicSink;
57 import org.onap.policy.common.endpoints.event.comm.TopicSource;
58 import org.onap.policy.common.utils.coder.Coder;
59 import org.onap.policy.common.utils.coder.CoderException;
60 import org.onap.policy.common.utils.coder.StandardCoder;
62 @RunWith(MockitoJUnitRunner.class)
63 public class BidirectionalTopicClientTest {
64 private static final Coder coder = new StandardCoder();
65 private static final long MAX_WAIT_MS = 5000;
66 private static final long SHORT_WAIT_MS = 1;
67 private static final String SINK_TOPIC = "my-sink-topic";
68 private static final String SOURCE_TOPIC = "my-source-topic";
69 private static final String MY_TEXT = "my-text";
71 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
72 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
75 private TopicSink sink;
77 private TopicSource source;
79 private TopicEndpoint endpoint;
81 private TopicListener listener;
83 private MyMessage theMessage;
85 private BidirectionalTopicClient client;
86 private Context context;
89 * Configures the endpoints.
92 public static void setUpBeforeClass() {
93 Properties props = new Properties();
94 props.setProperty("noop.sink.topics", SINK_TOPIC);
95 props.setProperty("noop.source.topics", SOURCE_TOPIC);
97 // clear all topics and then configure one sink and one source
98 TopicEndpointManager.getManager().shutdown();
99 TopicEndpointManager.getManager().addTopicSinks(props);
100 TopicEndpointManager.getManager().addTopicSources(props);
104 public static void tearDownAfterClass() {
105 // clear all topics after the tests
106 TopicEndpointManager.getManager().shutdown();
110 * Creates mocks and an initial client object.
113 public void setUp() throws Exception {
114 when(sink.send(anyString())).thenReturn(true);
115 when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
117 when(source.offer(anyString())).thenReturn(true);
118 when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
120 when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
121 when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
123 when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
124 when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
126 theMessage = new MyMessage(MY_TEXT);
128 client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
130 context = new Context();
134 public void tearDown() {
139 public void testBidirectionalTopicClient_testGetters() {
140 assertSame(sink, client.getSink());
141 assertSame(source, client.getSource());
142 assertEquals(SINK_TOPIC, client.getSinkTopic());
143 assertEquals(SOURCE_TOPIC, client.getSourceTopic());
144 assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
145 assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
149 * Tests the constructor when the sink or source cannot be found.
152 public void testBidirectionalTopicClientExceptions() {
153 assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
154 .isInstanceOf(BidirectionalTopicClientException.class)
155 .hasMessage("no sinks for topic: unknown-sink");
157 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
158 .isInstanceOf(BidirectionalTopicClientException.class)
159 .hasMessage("no sources for topic: unknown-source");
162 when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
164 assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
165 .isInstanceOf(BidirectionalTopicClientException.class)
166 .hasMessage("too many sources for topic: my-source-topic");
170 * Tests the "delegate" methods.
173 public void testDelegates() {
174 assertTrue(client.send("hello"));
175 verify(sink).send("hello");
177 assertTrue(client.offer("incoming"));
178 verify(source).offer("incoming");
180 client.register(listener);
181 verify(source).register(listener);
183 client.unregister(listener);
184 verify(source).unregister(listener);
188 public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
189 // use a real manager
190 client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
191 assertNotNull(client.getTopicEndpointManager());
193 assertNotNull(client.getSink());
194 assertNotNull(client.getSource());
196 assertNotSame(sink, client.getSink());
197 assertNotSame(source, client.getSource());
201 public void testAwaitReceipt() throws Exception {
202 context.start(theMessage);
203 assertThat(context.awaitSend(1)).isTrue();
205 verify(source).register(any());
206 verify(sink, atLeast(1)).send(any());
207 assertThat(context.checker.isReady()).isFalse();
215 public void testAwaitReceipt_AlreadyDone() throws Exception {
216 context.start(theMessage);
217 assertThat(context.awaitSend(1)).isTrue();
223 // calling again should result in "true" again, without injecting message
224 context.start(theMessage);
229 public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
230 context.start(theMessage);
231 assertThat(context.awaitSend(1)).isTrue();
233 // non-matching message
236 // wait for a few more calls to "send" and then inject a matching message
237 assertThat(context.awaitSend(3)).isTrue();
244 public void testAwaitReceipt_DecodeFails() throws Exception {
245 context.start(theMessage);
246 assertThat(context.awaitSend(1)).isTrue();
248 // force a failure and inject the message
249 context.forceDecodeFailure = true;
252 assertThat(context.awaitDecodeFailure()).isTrue();
255 context.forceDecodeFailure = false;
262 public void testAwaitReceipt_Interrupted() throws InterruptedException {
263 context.start(theMessage);
264 assertThat(context.awaitSend(1)).isTrue();
272 public void testAwaitReceipt_MultipleLoops() throws Exception {
273 context.start(theMessage);
275 // wait for multiple "send" calls
276 assertThat(context.awaitSend(3)).isTrue();
284 public void testStop() throws InterruptedException {
285 context.start(theMessage);
286 assertThat(context.awaitSend(1)).isTrue();
294 * Verifies that awaitReceipt() returns {@code true}.
296 * @throws InterruptedException if interrupted while waiting for the thread to
299 private void verifyReceipt() throws InterruptedException {
300 assertThat(context.join()).isTrue();
301 assertThat(context.result).isTrue();
302 assertThat(context.exception).isNull();
303 assertThat(context.checker.isReady()).isTrue();
305 verify(source).unregister(any());
309 * Verifies that awaitReceipt() returns {@code false}.
311 * @throws InterruptedException if interrupted while waiting for the thread to
314 private void verifyNoReceipt() throws InterruptedException {
315 assertThat(context.join()).isTrue();
316 assertThat(context.result).isFalse();
317 assertThat(context.exception).isNull();
318 assertThat(context.checker.isReady()).isFalse();
320 verify(source).unregister(any());
324 * Injects a message into the source topic.
326 * @param message message to be injected
327 * @throws CoderException if the message cannot be encoded
329 private void inject(MyMessage message) throws CoderException {
330 inject(coder.encode(message));
334 * Injects a message into the source topic.
336 * @param message message to be injected
338 private void inject(String message) {
339 ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
340 verify(source).register(cap.capture());
342 cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
347 * BidirectionalTopicClient with some overrides.
349 private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
351 public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
352 throws BidirectionalTopicClientException {
353 super(sinkTopic, sourceTopic);
357 protected TopicEndpoint getTopicEndpointManager() {
362 private class Context {
363 private Thread thread;
364 private boolean result;
365 private Exception exception;
366 private boolean forceDecodeFailure;
368 // released every time the checker publishes a message
369 private final Semaphore sendSem = new Semaphore(0);
371 // released every time a message-decode fails
372 private final Semaphore decodeFailedSem = new Semaphore(0);
374 private final BidirectionalTopicClient2 checker;
376 public Context() throws BidirectionalTopicClientException {
378 checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
381 public boolean send(String messageText) {
382 boolean result = super.send(messageText);
388 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
389 if (forceDecodeFailure) {
390 throw new CoderException("expected exception");
393 return super.decode(msg, clazz);
397 protected void decodeFailed() {
398 super.decodeFailed();
399 decodeFailedSem.release();
407 * @param message message to be sent to the sink topic
409 public void start(MyMessage message) {
410 thread = new Thread() {
414 result = checker.awaitReady(message, SHORT_WAIT_MS);
415 } catch (Exception e) {
420 thread.setDaemon(true);
425 checker.stopWaiting();
428 public boolean join() throws InterruptedException {
429 thread.join(MAX_WAIT_MS);
430 return !thread.isAlive();
433 public void interrupt() {
437 public boolean awaitSend(int npermits) throws InterruptedException {
438 return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
441 public boolean awaitDecodeFailure() throws InterruptedException {
442 return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
449 public static class MyMessage {