704b2cb06c13f01e282f52bb76ea0b1b77d4efac
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.client;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNotSame;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.anyString;
33 import static org.mockito.Mockito.atLeast;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.Arrays;
38 import java.util.Properties;
39 import java.util.concurrent.Semaphore;
40 import java.util.concurrent.TimeUnit;
41 import lombok.AllArgsConstructor;
42 import lombok.Data;
43 import lombok.NoArgsConstructor;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Test;
49 import org.junit.runner.RunWith;
50 import org.mockito.ArgumentCaptor;
51 import org.mockito.Mock;
52 import org.mockito.junit.MockitoJUnitRunner;
53 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
55 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
56 import org.onap.policy.common.endpoints.event.comm.TopicListener;
57 import org.onap.policy.common.endpoints.event.comm.TopicSink;
58 import org.onap.policy.common.endpoints.event.comm.TopicSource;
59 import org.onap.policy.common.utils.coder.Coder;
60 import org.onap.policy.common.utils.coder.CoderException;
61 import org.onap.policy.common.utils.coder.StandardCoder;
62
63 @RunWith(MockitoJUnitRunner.class)
64 public class BidirectionalTopicClientTest {
65     private static final Coder coder = new StandardCoder();
66     private static final long MAX_WAIT_MS = 5000;
67     private static final long SHORT_WAIT_MS = 1;
68     private static final String SINK_TOPIC = "my-sink-topic";
69     private static final String SOURCE_TOPIC = "my-source-topic";
70     private static final String MY_TEXT = "my-text";
71
72     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
73     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
74
75     @Mock
76     private TopicSink sink;
77     @Mock
78     private TopicSource source;
79     @Mock
80     private TopicEndpoint endpoint;
81     @Mock
82     private TopicListener listener;
83
84     private MyMessage theMessage;
85
86     private BidirectionalTopicClient client;
87     private Context context;
88
89     /**
90      * Configures the endpoints.
91      */
92     @BeforeClass
93     public static void setUpBeforeClass() {
94         Properties props = new Properties();
95         props.setProperty("noop.sink.topics", SINK_TOPIC);
96         props.setProperty("noop.source.topics", SOURCE_TOPIC);
97
98         // clear all topics and then configure one sink and one source
99         TopicEndpointManager.getManager().shutdown();
100         TopicEndpointManager.getManager().addTopicSinks(props);
101         TopicEndpointManager.getManager().addTopicSources(props);
102     }
103
104     @AfterClass
105     public static void tearDownAfterClass() {
106         // clear all topics after the tests
107         TopicEndpointManager.getManager().shutdown();
108     }
109
110     /**
111      * Creates mocks and an initial client object.
112      */
113     @Before
114     public void setUp() throws Exception {
115         when(sink.send(anyString())).thenReturn(true);
116         when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
117
118         when(source.offer(anyString())).thenReturn(true);
119         when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
120
121         when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
122         when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
123
124         when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
125         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
126
127         theMessage = new MyMessage(MY_TEXT);
128
129         client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
130
131         context = new Context();
132     }
133
134     @After
135     public void tearDown() {
136         context.stop();
137     }
138
139     @Test
140     public void testBidirectionalTopicClient_testGetters() {
141         assertSame(sink, client.getSink());
142         assertSame(source, client.getSource());
143         assertEquals(SINK_TOPIC, client.getSinkTopic());
144         assertEquals(SOURCE_TOPIC, client.getSourceTopic());
145         assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
146         assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
147     }
148
149     /**
150      * Tests the constructor when the sink or source cannot be found.
151      */
152     @Test
153     public void testBidirectionalTopicClientExceptions() {
154         assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
155                         .isInstanceOf(BidirectionalTopicClientException.class)
156                         .hasMessage("no sinks for topic: unknown-sink");
157
158         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
159                         .isInstanceOf(BidirectionalTopicClientException.class)
160                         .hasMessage("no sources for topic: unknown-source");
161
162         // too many sources
163         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
164
165         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
166                         .isInstanceOf(BidirectionalTopicClientException.class)
167                         .hasMessage("too many sources for topic: my-source-topic");
168     }
169
170     /**
171      * Tests the "delegate" methods.
172      */
173     @Test
174     public void testDelegates() {
175         assertTrue(client.send("hello"));
176         verify(sink).send("hello");
177
178         assertTrue(client.offer("incoming"));
179         verify(source).offer("incoming");
180
181         client.register(listener);
182         verify(source).register(listener);
183
184         client.unregister(listener);
185         verify(source).unregister(listener);
186     }
187
188     @Test
189     public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
190         // use a real manager
191         client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
192         assertNotNull(client.getTopicEndpointManager());
193
194         assertNotNull(client.getSink());
195         assertNotNull(client.getSource());
196
197         assertNotSame(sink, client.getSink());
198         assertNotSame(source, client.getSource());
199     }
200
201     @Test
202     public void testAwaitReceipt() throws Exception {
203         context.start(theMessage);
204         assertThat(context.awaitSend(1)).isTrue();
205
206         verify(source).register(any());
207         verify(sink, atLeast(1)).send(any());
208         assertThat(context.checker.isReady()).isFalse();
209
210         inject(theMessage);
211
212         verifyReceipt();
213     }
214
215     @Test
216     public void testAwaitReceipt_AlreadyDone() throws Exception {
217         context.start(theMessage);
218         assertThat(context.awaitSend(1)).isTrue();
219
220         inject(theMessage);
221
222         verifyReceipt();
223
224         // calling again should result in "true" again, without injecting message
225         context.start(theMessage);
226         verifyReceipt();
227     }
228
229     @Test
230     public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
231         context.start(theMessage);
232         assertThat(context.awaitSend(1)).isTrue();
233
234         // non-matching message
235         inject("{}");
236
237         // wait for a few more calls to "send" and then inject a matching message
238         assertThat(context.awaitSend(3)).isTrue();
239         inject(theMessage);
240
241         verifyReceipt();
242     }
243
244     @Test
245     public void testAwaitReceipt_DecodeFails() throws Exception {
246         context.start(theMessage);
247         assertThat(context.awaitSend(1)).isTrue();
248
249         // force a failure and inject the message
250         context.forceDecodeFailure = true;
251         inject(theMessage);
252
253         assertThat(context.awaitDecodeFailure()).isTrue();
254
255         // no more failures
256         context.forceDecodeFailure = false;
257         inject(theMessage);
258
259         verifyReceipt();
260     }
261
262     @Test
263     public void testAwaitReceipt_Interrupted() throws InterruptedException {
264         context.start(theMessage);
265         assertThat(context.awaitSend(1)).isTrue();
266
267         context.interrupt();
268
269         verifyNoReceipt();
270     }
271
272     @Test
273     public void testAwaitReceipt_MultipleLoops() throws Exception {
274         context.start(theMessage);
275
276         // wait for multiple "send" calls
277         assertThat(context.awaitSend(3)).isTrue();
278
279         inject(theMessage);
280
281         verifyReceipt();
282     }
283
284     @Test
285     public void testStop() throws InterruptedException {
286         context.start(theMessage);
287         assertThat(context.awaitSend(1)).isTrue();
288
289         context.stop();
290
291         verifyNoReceipt();
292     }
293
294     /**
295      * Verifies that awaitReceipt() returns {@code true}.
296      *
297      * @throws InterruptedException if interrupted while waiting for the thread to
298      *         terminate
299      */
300     private void verifyReceipt() throws InterruptedException {
301         assertThat(context.join()).isTrue();
302         assertThat(context.result).isTrue();
303         assertThat(context.exception).isNull();
304         assertThat(context.checker.isReady()).isTrue();
305
306         verify(source).unregister(any());
307     }
308
309     /**
310      * Verifies that awaitReceipt() returns {@code false}.
311      *
312      * @throws InterruptedException if interrupted while waiting for the thread to
313      *         terminate
314      */
315     private void verifyNoReceipt() throws InterruptedException {
316         assertThat(context.join()).isTrue();
317         assertThat(context.result).isFalse();
318         assertThat(context.exception).isNull();
319         assertThat(context.checker.isReady()).isFalse();
320
321         verify(source).unregister(any());
322     }
323
324     /**
325      * Injects a message into the source topic.
326      *
327      * @param message message to be injected
328      * @throws CoderException if the message cannot be encoded
329      */
330     private void inject(MyMessage message) throws CoderException {
331         inject(coder.encode(message));
332     }
333
334     /**
335      * Injects a message into the source topic.
336      *
337      * @param message message to be injected
338      */
339     private void inject(String message) {
340         ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
341         verify(source).register(cap.capture());
342
343         cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
344     }
345
346
347     /**
348      * BidirectionalTopicClient with some overrides.
349      */
350     private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
351
352         public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
353                         throws BidirectionalTopicClientException {
354             super(sinkTopic, sourceTopic);
355         }
356
357         @Override
358         protected TopicEndpoint getTopicEndpointManager() {
359             return endpoint;
360         }
361     }
362
363     private class Context {
364         private Thread thread;
365         private boolean result;
366         private Exception exception;
367         private boolean forceDecodeFailure;
368
369         // released every time the checker publishes a message
370         private final Semaphore sendSem = new Semaphore(0);
371
372         // released every time a message-decode fails
373         private final Semaphore decodeFailedSem = new Semaphore(0);
374
375         private final BidirectionalTopicClient2 checker;
376
377         public Context() throws BidirectionalTopicClientException {
378
379             checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
380
381                 @Override
382                 public boolean send(String messageText) {
383                     boolean result = super.send(messageText);
384                     sendSem.release();
385                     return result;
386                 }
387
388                 @Override
389                 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
390                     if (forceDecodeFailure) {
391                         throw new CoderException("expected exception");
392                     }
393
394                     return super.decode(msg, clazz);
395                 }
396
397                 @Override
398                 protected void decodeFailed() {
399                     super.decodeFailed();
400                     decodeFailedSem.release();
401                 }
402             };
403         }
404
405         /**
406          * Starts the thread.
407          *
408          * @param message message to be sent to the sink topic
409          */
410         public void start(MyMessage message) {
411             thread = new Thread() {
412                 @Override
413                 public void run() {
414                     try {
415                         result = checker.awaitReady(message, SHORT_WAIT_MS);
416                     } catch (Exception e) {
417                         exception = e;
418                     }
419                 }
420             };
421             thread.setDaemon(true);
422             thread.start();
423         }
424
425         public void stop() {
426             checker.stopWaiting();
427         }
428
429         public boolean join() throws InterruptedException {
430             thread.join(MAX_WAIT_MS);
431             return !thread.isAlive();
432         }
433
434         public void interrupt() {
435             thread.interrupt();
436         }
437
438         public boolean awaitSend(int npermits) throws InterruptedException {
439             return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
440         }
441
442         public boolean awaitDecodeFailure() throws InterruptedException {
443             return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
444         }
445     }
446
447     @Data
448     @NoArgsConstructor
449     @AllArgsConstructor
450     public static class MyMessage {
451         private String text;
452     }
453 }