890fa7209a5a66b22922a27c1254013d6cd198fe
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.client;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNotSame;
29 import static org.junit.jupiter.api.Assertions.assertSame;
30 import static org.junit.jupiter.api.Assertions.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.anyString;
33 import static org.mockito.Mockito.atLeast;
34 import static org.mockito.Mockito.lenient;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
37
38 import java.util.Arrays;
39 import java.util.Properties;
40 import java.util.concurrent.Semaphore;
41 import java.util.concurrent.TimeUnit;
42 import lombok.AllArgsConstructor;
43 import lombok.Data;
44 import lombok.NoArgsConstructor;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.AfterEach;
47 import org.junit.jupiter.api.BeforeAll;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.junit.jupiter.api.extension.ExtendWith;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.Mock;
53 import org.mockito.junit.jupiter.MockitoExtension;
54 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
55 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
56 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
57 import org.onap.policy.common.endpoints.event.comm.TopicListener;
58 import org.onap.policy.common.endpoints.event.comm.TopicSink;
59 import org.onap.policy.common.endpoints.event.comm.TopicSource;
60 import org.onap.policy.common.utils.coder.Coder;
61 import org.onap.policy.common.utils.coder.CoderException;
62 import org.onap.policy.common.utils.coder.StandardCoder;
63
64 @ExtendWith(MockitoExtension.class)
65 class BidirectionalTopicClientTest {
66     private static final Coder coder = new StandardCoder();
67     private static final long MAX_WAIT_MS = 5000;
68     private static final long SHORT_WAIT_MS = 1;
69     private static final String SINK_TOPIC = "my-sink-topic";
70     private static final String SOURCE_TOPIC = "my-source-topic";
71     private static final String MY_TEXT = "my-text";
72
73     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
74     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
75
76     @Mock
77     private TopicSink sink;
78     @Mock
79     private TopicSource source;
80     @Mock
81     private TopicEndpoint endpoint;
82     @Mock
83     private TopicListener listener;
84
85     private MyMessage theMessage;
86
87     private BidirectionalTopicClient client;
88     private Context context;
89
90     /**
91      * Configures the endpoints.
92      */
93     @BeforeAll
94     public static void setUpBeforeClass() {
95         Properties props = new Properties();
96         props.setProperty("noop.sink.topics", SINK_TOPIC);
97         props.setProperty("noop.source.topics", SOURCE_TOPIC);
98
99         // clear all topics and then configure one sink and one source
100         TopicEndpointManager.getManager().shutdown();
101         TopicEndpointManager.getManager().addTopicSinks(props);
102         TopicEndpointManager.getManager().addTopicSources(props);
103     }
104
105     @AfterAll
106     public static void tearDownAfterClass() {
107         // clear all topics after the tests
108         TopicEndpointManager.getManager().shutdown();
109     }
110
111     /**
112      * Creates mocks and an initial client object.
113      */
114     @BeforeEach
115     public void setUp() throws Exception {
116         lenient().when(sink.send(anyString())).thenReturn(true);
117         lenient().when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
118
119         lenient().when(source.offer(anyString())).thenReturn(true);
120         lenient().when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
121
122         lenient().when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
123         lenient().when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
124
125         lenient().when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
126         lenient().when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
127
128         theMessage = new MyMessage(MY_TEXT);
129
130         client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
131
132         context = new Context();
133     }
134
135     @AfterEach
136     public void tearDown() {
137         context.stop();
138     }
139
140     @Test
141     void testBidirectionalTopicClient_testGetters() {
142         assertSame(sink, client.getSink());
143         assertSame(source, client.getSource());
144         assertEquals(SINK_TOPIC, client.getSinkTopic());
145         assertEquals(SOURCE_TOPIC, client.getSourceTopic());
146         assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
147         assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
148     }
149
150     /**
151      * Tests the constructor when the sink or source cannot be found.
152      */
153     @Test
154     void testBidirectionalTopicClientExceptions() {
155         assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
156                         .isInstanceOf(BidirectionalTopicClientException.class)
157                         .hasMessage("no sinks for topic: unknown-sink");
158
159         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
160                         .isInstanceOf(BidirectionalTopicClientException.class)
161                         .hasMessage("no sources for topic: unknown-source");
162
163         // too many sources
164         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
165
166         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
167                         .isInstanceOf(BidirectionalTopicClientException.class)
168                         .hasMessage("too many sources for topic: my-source-topic");
169     }
170
171     /**
172      * Tests the "delegate" methods.
173      */
174     @Test
175     void testDelegates() {
176         assertTrue(client.send("hello"));
177         verify(sink).send("hello");
178
179         assertTrue(client.offer("incoming"));
180         verify(source).offer("incoming");
181
182         client.register(listener);
183         verify(source).register(listener);
184
185         client.unregister(listener);
186         verify(source).unregister(listener);
187     }
188
189     @Test
190     void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
191         // use a real manager
192         client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
193         assertNotNull(client.getTopicEndpointManager());
194
195         assertNotNull(client.getSink());
196         assertNotNull(client.getSource());
197
198         assertNotSame(sink, client.getSink());
199         assertNotSame(source, client.getSource());
200     }
201
202     @Test
203     void testAwaitReceipt() throws Exception {
204         context.start(theMessage);
205         assertThat(context.awaitSend(1)).isTrue();
206
207         verify(source).register(any());
208         verify(sink, atLeast(1)).send(any());
209         assertThat(context.checker.isReady()).isFalse();
210
211         inject(theMessage);
212
213         verifyReceipt();
214     }
215
216     @Test
217     void testAwaitReceipt_AlreadyDone() throws Exception {
218         context.start(theMessage);
219         assertThat(context.awaitSend(1)).isTrue();
220
221         inject(theMessage);
222
223         verifyReceipt();
224
225         // calling again should result in "true" again, without injecting message
226         context.start(theMessage);
227         verifyReceipt();
228     }
229
230     @Test
231     void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
232         context.start(theMessage);
233         assertThat(context.awaitSend(1)).isTrue();
234
235         // non-matching message
236         inject("{}");
237
238         // wait for a few more calls to "send" and then inject a matching message
239         assertThat(context.awaitSend(3)).isTrue();
240         inject(theMessage);
241
242         verifyReceipt();
243     }
244
245     @Test
246     void testAwaitReceipt_DecodeFails() throws Exception {
247         context.start(theMessage);
248         assertThat(context.awaitSend(1)).isTrue();
249
250         // force a failure and inject the message
251         context.forceDecodeFailure = true;
252         inject(theMessage);
253
254         assertThat(context.awaitDecodeFailure()).isTrue();
255
256         // no more failures
257         context.forceDecodeFailure = false;
258         inject(theMessage);
259
260         verifyReceipt();
261     }
262
263     @Test
264     void testAwaitReceipt_Interrupted() throws InterruptedException {
265         context.start(theMessage);
266         assertThat(context.awaitSend(1)).isTrue();
267
268         context.interrupt();
269
270         verifyNoReceipt();
271     }
272
273     @Test
274     void testAwaitReceipt_MultipleLoops() throws Exception {
275         context.start(theMessage);
276
277         // wait for multiple "send" calls
278         assertThat(context.awaitSend(3)).isTrue();
279
280         inject(theMessage);
281
282         verifyReceipt();
283     }
284
285     @Test
286     void testStop() throws InterruptedException {
287         context.start(theMessage);
288         assertThat(context.awaitSend(1)).isTrue();
289
290         context.stop();
291
292         verifyNoReceipt();
293     }
294
295     /**
296      * Verifies that awaitReceipt() returns {@code true}.
297      *
298      * @throws InterruptedException if interrupted while waiting for the thread to
299      *         terminate
300      */
301     private void verifyReceipt() throws InterruptedException {
302         assertThat(context.join()).isTrue();
303         assertThat(context.result).isTrue();
304         assertThat(context.exception).isNull();
305         assertThat(context.checker.isReady()).isTrue();
306
307         verify(source).unregister(any());
308     }
309
310     /**
311      * Verifies that awaitReceipt() returns {@code false}.
312      *
313      * @throws InterruptedException if interrupted while waiting for the thread to
314      *         terminate
315      */
316     private void verifyNoReceipt() throws InterruptedException {
317         assertThat(context.join()).isTrue();
318         assertThat(context.result).isFalse();
319         assertThat(context.exception).isNull();
320         assertThat(context.checker.isReady()).isFalse();
321
322         verify(source).unregister(any());
323     }
324
325     /**
326      * Injects a message into the source topic.
327      *
328      * @param message message to be injected
329      * @throws CoderException if the message cannot be encoded
330      */
331     private void inject(MyMessage message) throws CoderException {
332         inject(coder.encode(message));
333     }
334
335     /**
336      * Injects a message into the source topic.
337      *
338      * @param message message to be injected
339      */
340     private void inject(String message) {
341         ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
342         verify(source).register(cap.capture());
343
344         cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
345     }
346
347
348     /**
349      * BidirectionalTopicClient with some overrides.
350      */
351     private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
352
353         public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
354                         throws BidirectionalTopicClientException {
355             super(sinkTopic, sourceTopic);
356         }
357
358         @Override
359         protected TopicEndpoint getTopicEndpointManager() {
360             return endpoint;
361         }
362     }
363
364     private class Context {
365         private Thread thread;
366         private boolean result;
367         private Exception exception;
368         private boolean forceDecodeFailure;
369
370         // released every time the checker publishes a message
371         private final Semaphore sendSem = new Semaphore(0);
372
373         // released every time a message-decode fails
374         private final Semaphore decodeFailedSem = new Semaphore(0);
375
376         private final BidirectionalTopicClient2 checker;
377
378         public Context() throws BidirectionalTopicClientException {
379
380             checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
381
382                 @Override
383                 public boolean send(String messageText) {
384                     boolean result = super.send(messageText);
385                     sendSem.release();
386                     return result;
387                 }
388
389                 @Override
390                 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
391                     if (forceDecodeFailure) {
392                         throw new CoderException("expected exception");
393                     }
394
395                     return super.decode(msg, clazz);
396                 }
397
398                 @Override
399                 protected void decodeFailed() {
400                     super.decodeFailed();
401                     decodeFailedSem.release();
402                 }
403             };
404         }
405
406         /**
407          * Starts the thread.
408          *
409          * @param message message to be sent to the sink topic
410          */
411         public void start(MyMessage message) {
412             thread = new Thread() {
413                 @Override
414                 public void run() {
415                     try {
416                         result = checker.awaitReady(message, SHORT_WAIT_MS);
417                     } catch (Exception e) {
418                         exception = e;
419                     }
420                 }
421             };
422             thread.setDaemon(true);
423             thread.start();
424         }
425
426         public void stop() {
427             checker.stopWaiting();
428         }
429
430         public boolean join() throws InterruptedException {
431             thread.join(MAX_WAIT_MS);
432             return !thread.isAlive();
433         }
434
435         public void interrupt() {
436             thread.interrupt();
437         }
438
439         public boolean awaitSend(int npermits) throws InterruptedException {
440             return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
441         }
442
443         public boolean awaitDecodeFailure() throws InterruptedException {
444             return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
445         }
446     }
447
448     @Data
449     @NoArgsConstructor
450     @AllArgsConstructor
451     public static class MyMessage {
452         private String text;
453     }
454 }