2605c14bd618829293bfb3867daec7bf5138c42e
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.client;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNotSame;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.ArgumentMatchers.any;
31 import static org.mockito.ArgumentMatchers.anyString;
32 import static org.mockito.Mockito.atLeast;
33 import static org.mockito.Mockito.verify;
34 import static org.mockito.Mockito.when;
35
36 import java.util.Arrays;
37 import java.util.Properties;
38 import java.util.concurrent.Semaphore;
39 import java.util.concurrent.TimeUnit;
40 import lombok.AllArgsConstructor;
41 import lombok.Data;
42 import lombok.NoArgsConstructor;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
54 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
55 import org.onap.policy.common.endpoints.event.comm.TopicListener;
56 import org.onap.policy.common.endpoints.event.comm.TopicSink;
57 import org.onap.policy.common.endpoints.event.comm.TopicSource;
58 import org.onap.policy.common.utils.coder.Coder;
59 import org.onap.policy.common.utils.coder.CoderException;
60 import org.onap.policy.common.utils.coder.StandardCoder;
61
62 @RunWith(MockitoJUnitRunner.class)
63 public class BidirectionalTopicClientTest {
64     private static final Coder coder = new StandardCoder();
65     private static final long MAX_WAIT_MS = 5000;
66     private static final long SHORT_WAIT_MS = 1;
67     private static final String SINK_TOPIC = "my-sink-topic";
68     private static final String SOURCE_TOPIC = "my-source-topic";
69     private static final String MY_TEXT = "my-text";
70
71     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
72     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
73
74     @Mock
75     private TopicSink sink;
76     @Mock
77     private TopicSource source;
78     @Mock
79     private TopicEndpoint endpoint;
80     @Mock
81     private TopicListener listener;
82
83     private MyMessage theMessage;
84
85     private BidirectionalTopicClient client;
86     private Context context;
87
88     /**
89      * Configures the endpoints.
90      */
91     @BeforeClass
92     public static void setUpBeforeClass() {
93         Properties props = new Properties();
94         props.setProperty("noop.sink.topics", SINK_TOPIC);
95         props.setProperty("noop.source.topics", SOURCE_TOPIC);
96
97         // clear all topics and then configure one sink and one source
98         TopicEndpointManager.getManager().shutdown();
99         TopicEndpointManager.getManager().addTopicSinks(props);
100         TopicEndpointManager.getManager().addTopicSources(props);
101     }
102
103     @AfterClass
104     public static void tearDownAfterClass() {
105         // clear all topics after the tests
106         TopicEndpointManager.getManager().shutdown();
107     }
108
109     /**
110      * Creates mocks and an initial client object.
111      */
112     @Before
113     public void setUp() throws Exception {
114         when(sink.send(anyString())).thenReturn(true);
115         when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
116
117         when(source.offer(anyString())).thenReturn(true);
118         when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
119
120         when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
121         when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
122
123         when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
124         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source));
125
126         theMessage = new MyMessage(MY_TEXT);
127
128         client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
129
130         context = new Context();
131     }
132
133     @After
134     public void tearDown() {
135         context.stop();
136     }
137
138     @Test
139     public void testBidirectionalTopicClient_testGetters() {
140         assertSame(sink, client.getSink());
141         assertSame(source, client.getSource());
142         assertEquals(SINK_TOPIC, client.getSinkTopic());
143         assertEquals(SOURCE_TOPIC, client.getSourceTopic());
144         assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
145         assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
146     }
147
148     /**
149      * Tests the constructor when the sink or source cannot be found.
150      */
151     @Test
152     public void testBidirectionalTopicClientExceptions() {
153         assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
154                         .isInstanceOf(BidirectionalTopicClientException.class)
155                         .hasMessage("no sinks for topic: unknown-sink");
156
157         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
158                         .isInstanceOf(BidirectionalTopicClientException.class)
159                         .hasMessage("no sources for topic: unknown-source");
160
161         // too many sources
162         when(endpoint.getTopicSources(Arrays.asList(SOURCE_TOPIC))).thenReturn(Arrays.asList(source, source));
163
164         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
165                         .isInstanceOf(BidirectionalTopicClientException.class)
166                         .hasMessage("too many sources for topic: my-source-topic");
167     }
168
169     /**
170      * Tests the "delegate" methods.
171      */
172     @Test
173     public void testDelegates() {
174         assertTrue(client.send("hello"));
175         verify(sink).send("hello");
176
177         assertTrue(client.offer("incoming"));
178         verify(source).offer("incoming");
179
180         client.register(listener);
181         verify(source).register(listener);
182
183         client.unregister(listener);
184         verify(source).unregister(listener);
185     }
186
187     @Test
188     public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
189         // use a real manager
190         client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
191         assertNotNull(client.getTopicEndpointManager());
192
193         assertNotNull(client.getSink());
194         assertNotNull(client.getSource());
195
196         assertNotSame(sink, client.getSink());
197         assertNotSame(source, client.getSource());
198     }
199
200     @Test
201     public void testAwaitReceipt() throws Exception {
202         context.start(theMessage);
203         assertThat(context.awaitSend(1)).isTrue();
204
205         verify(source).register(any());
206         verify(sink, atLeast(1)).send(any());
207         assertThat(context.checker.isReady()).isFalse();
208
209         inject(theMessage);
210
211         verifyReceipt();
212     }
213
214     @Test
215     public void testAwaitReceipt_AlreadyDone() throws Exception {
216         context.start(theMessage);
217         assertThat(context.awaitSend(1)).isTrue();
218
219         inject(theMessage);
220
221         verifyReceipt();
222
223         // calling again should result in "true" again, without injecting message
224         context.start(theMessage);
225         verifyReceipt();
226     }
227
228     @Test
229     public void testAwaitReceipt_MessageDoesNotMatch() throws Exception {
230         context.start(theMessage);
231         assertThat(context.awaitSend(1)).isTrue();
232
233         // non-matching message
234         inject("{}");
235
236         // wait for a few more calls to "send" and then inject a matching message
237         assertThat(context.awaitSend(3)).isTrue();
238         inject(theMessage);
239
240         verifyReceipt();
241     }
242
243     @Test
244     public void testAwaitReceipt_DecodeFails() throws Exception {
245         context.start(theMessage);
246         assertThat(context.awaitSend(1)).isTrue();
247
248         // force a failure and inject the message
249         context.forceDecodeFailure = true;
250         inject(theMessage);
251
252         assertThat(context.awaitDecodeFailure()).isTrue();
253
254         // no more failures
255         context.forceDecodeFailure = false;
256         inject(theMessage);
257
258         verifyReceipt();
259     }
260
261     @Test
262     public void testAwaitReceipt_Interrupted() throws InterruptedException {
263         context.start(theMessage);
264         assertThat(context.awaitSend(1)).isTrue();
265
266         context.interrupt();
267
268         verifyNoReceipt();
269     }
270
271     @Test
272     public void testAwaitReceipt_MultipleLoops() throws Exception {
273         context.start(theMessage);
274
275         // wait for multiple "send" calls
276         assertThat(context.awaitSend(3)).isTrue();
277
278         inject(theMessage);
279
280         verifyReceipt();
281     }
282
283     @Test
284     public void testStop() throws InterruptedException {
285         context.start(theMessage);
286         assertThat(context.awaitSend(1)).isTrue();
287
288         context.stop();
289
290         verifyNoReceipt();
291     }
292
293     /**
294      * Verifies that awaitReceipt() returns {@code true}.
295      *
296      * @throws InterruptedException if interrupted while waiting for the thread to
297      *         terminate
298      */
299     private void verifyReceipt() throws InterruptedException {
300         assertThat(context.join()).isTrue();
301         assertThat(context.result).isTrue();
302         assertThat(context.exception).isNull();
303         assertThat(context.checker.isReady()).isTrue();
304
305         verify(source).unregister(any());
306     }
307
308     /**
309      * Verifies that awaitReceipt() returns {@code false}.
310      *
311      * @throws InterruptedException if interrupted while waiting for the thread to
312      *         terminate
313      */
314     private void verifyNoReceipt() throws InterruptedException {
315         assertThat(context.join()).isTrue();
316         assertThat(context.result).isFalse();
317         assertThat(context.exception).isNull();
318         assertThat(context.checker.isReady()).isFalse();
319
320         verify(source).unregister(any());
321     }
322
323     /**
324      * Injects a message into the source topic.
325      *
326      * @param message message to be injected
327      * @throws CoderException if the message cannot be encoded
328      */
329     private void inject(MyMessage message) throws CoderException {
330         inject(coder.encode(message));
331     }
332
333     /**
334      * Injects a message into the source topic.
335      *
336      * @param message message to be injected
337      */
338     private void inject(String message) {
339         ArgumentCaptor<TopicListener> cap = ArgumentCaptor.forClass(TopicListener.class);
340         verify(source).register(cap.capture());
341
342         cap.getValue().onTopicEvent(SOURCE_INFRA, SOURCE_TOPIC, message);
343     }
344
345
346     /**
347      * BidirectionalTopicClient with some overrides.
348      */
349     private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
350
351         public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
352                         throws BidirectionalTopicClientException {
353             super(sinkTopic, sourceTopic);
354         }
355
356         @Override
357         protected TopicEndpoint getTopicEndpointManager() {
358             return endpoint;
359         }
360     }
361
362     private class Context {
363         private Thread thread;
364         private boolean result;
365         private Exception exception;
366         private boolean forceDecodeFailure;
367
368         // released every time the checker publishes a message
369         private final Semaphore sendSem = new Semaphore(0);
370
371         // released every time a message-decode fails
372         private final Semaphore decodeFailedSem = new Semaphore(0);
373
374         private final BidirectionalTopicClient2 checker;
375
376         public Context() throws BidirectionalTopicClientException {
377
378             checker = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC) {
379
380                 @Override
381                 public boolean send(String messageText) {
382                     boolean result = super.send(messageText);
383                     sendSem.release();
384                     return result;
385                 }
386
387                 @Override
388                 protected <T> T decode(String msg, Class<? extends T> clazz) throws CoderException {
389                     if (forceDecodeFailure) {
390                         throw new CoderException("expected exception");
391                     }
392
393                     return super.decode(msg, clazz);
394                 }
395
396                 @Override
397                 protected void decodeFailed() {
398                     super.decodeFailed();
399                     decodeFailedSem.release();
400                 }
401             };
402         }
403
404         /**
405          * Starts the thread.
406          *
407          * @param message message to be sent to the sink topic
408          */
409         public void start(MyMessage message) {
410             thread = new Thread() {
411                 @Override
412                 public void run() {
413                     try {
414                         result = checker.awaitReady(message, SHORT_WAIT_MS);
415                     } catch (Exception e) {
416                         exception = e;
417                     }
418                 }
419             };
420             thread.setDaemon(true);
421             thread.start();
422         }
423
424         public void stop() {
425             checker.stopWaiting();
426         }
427
428         public boolean join() throws InterruptedException {
429             thread.join(MAX_WAIT_MS);
430             return !thread.isAlive();
431         }
432
433         public void interrupt() {
434             thread.interrupt();
435         }
436
437         public boolean awaitSend(int npermits) throws InterruptedException {
438             return sendSem.tryAcquire(npermits, MAX_WAIT_MS, TimeUnit.MILLISECONDS);
439         }
440
441         public boolean awaitDecodeFailure() throws InterruptedException {
442             return decodeFailedSem.tryAcquire(MAX_WAIT_MS, TimeUnit.MILLISECONDS);
443         }
444     }
445
446     @Data
447     @NoArgsConstructor
448     @AllArgsConstructor
449     public static class MyMessage {
450         private String text;
451     }
452 }