Fix trailing space in Info.yaml models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperationTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023, 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.actorserviceprovider.impl;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
27 import static org.junit.jupiter.api.Assertions.assertEquals;
28 import static org.junit.jupiter.api.Assertions.assertFalse;
29 import static org.junit.jupiter.api.Assertions.assertNotNull;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32 import static org.mockito.ArgumentMatchers.any;
33 import static org.mockito.ArgumentMatchers.eq;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
37
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.function.BiConsumer;
42 import lombok.EqualsAndHashCode;
43 import lombok.Getter;
44 import lombok.Setter;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.junit.jupiter.api.extension.ExtendWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Captor;
50 import org.mockito.Mock;
51 import org.mockito.Mockito;
52 import org.mockito.junit.jupiter.MockitoExtension;
53 import org.onap.policy.common.message.bus.event.Topic.CommInfrastructure;
54 import org.onap.policy.common.utils.coder.Coder;
55 import org.onap.policy.common.utils.coder.CoderException;
56 import org.onap.policy.common.utils.coder.StandardCoder;
57 import org.onap.policy.common.utils.coder.StandardCoderObject;
58 import org.onap.policy.common.utils.time.PseudoExecutor;
59 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
60 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
61 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
62 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
63 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
64 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
65
66 @ExtendWith(MockitoExtension.class)
67 class BidirectionalTopicOperationTest {
68     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
69     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
70     private static final String ACTOR = "my-actor";
71     private static final String OPERATION = "my-operation";
72     private static final String REQ_ID = "my-request-id";
73     private static final String TEXT = "some text";
74     private static final int TIMEOUT_SEC = 10;
75     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
76     private static final int MAX_REQUESTS = 100;
77
78     private static final StandardCoder coder = new StandardCoder();
79
80     @Mock
81     private BidirectionalTopicConfig config;
82     @Mock
83     private BidirectionalTopicHandler handler;
84     @Mock
85     private Forwarder forwarder;
86
87     @Captor
88     private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
89
90     private ControlLoopOperationParams params;
91     private OperationOutcome outcome;
92     private StandardCoderObject stdResponse;
93     private MyResponse response;
94     private String responseText;
95     private PseudoExecutor executor;
96     private int ntimes;
97     private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
98
99     /**
100      * Sets up.
101      */
102     @BeforeEach
103     void setUp() throws CoderException {
104         Mockito.lenient().when(config.getTopicHandler()).thenReturn(handler);
105         Mockito.lenient().when(config.getForwarder()).thenReturn(forwarder);
106         Mockito.lenient().when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
107
108         Mockito.lenient().when(handler.send(any())).thenReturn(true);
109         Mockito.lenient().when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
110
111         executor = new PseudoExecutor();
112
113         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
114         outcome = params.makeOutcome();
115
116         response = new MyResponse();
117         response.setRequestId(REQ_ID);
118         responseText = coder.encode(response);
119         stdResponse = coder.decode(responseText, StandardCoderObject.class);
120
121         ntimes = 1;
122
123         oper = new MyOperation(params, config);
124     }
125
126     @Test
127     void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
128         assertEquals(ACTOR, oper.getActorName());
129         assertEquals(OPERATION, oper.getName());
130         assertSame(handler, oper.getTopicHandler());
131         assertSame(forwarder, oper.getForwarder());
132         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
133         assertSame(MyResponse.class, oper.getResponseClass());
134     }
135
136     @Test
137     void testStartOperationAsync() throws Exception {
138         // tell it to expect three responses
139         ntimes = 3;
140
141         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
142         assertFalse(future.isDone());
143
144         verify(forwarder).register(eq(List.of(REQ_ID)), listenerCaptor.capture());
145
146         verify(forwarder, never()).unregister(any(), any());
147
148         verify(handler).send(any());
149
150         // provide first response
151         listenerCaptor.getValue().accept(responseText, stdResponse);
152         assertTrue(executor.runAll(MAX_REQUESTS));
153         assertFalse(future.isDone());
154
155         // provide second response
156         listenerCaptor.getValue().accept(responseText, stdResponse);
157         assertTrue(executor.runAll(MAX_REQUESTS));
158         assertFalse(future.isDone());
159
160         // provide final response
161         listenerCaptor.getValue().accept(responseText, stdResponse);
162         assertTrue(executor.runAll(MAX_REQUESTS));
163         assertTrue(future.isDone());
164
165         assertSame(outcome, future.get());
166         assertEquals(OperationResult.SUCCESS, outcome.getResult());
167         assertEquals(response, outcome.getResponse());
168
169         verify(forwarder).unregister(List.of(REQ_ID), listenerCaptor.getValue());
170     }
171
172     /**
173      * Tests startOperationAsync() when processResponse() throws an exception.
174      */
175     @Test
176     void testStartOperationAsyncProcException() {
177         oper = new MyOperation(params, config) {
178             @Override
179             protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
180                                                        StandardCoderObject scoResponse) {
181                 throw EXPECTED_EXCEPTION;
182             }
183         };
184
185         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
186         assertFalse(future.isDone());
187
188         verify(forwarder).register(eq(List.of(REQ_ID)), listenerCaptor.capture());
189
190         verify(forwarder, never()).unregister(any(), any());
191
192         // provide a response
193         listenerCaptor.getValue().accept(responseText, stdResponse);
194         assertTrue(executor.runAll(MAX_REQUESTS));
195         assertTrue(future.isCompletedExceptionally());
196
197         verify(forwarder).unregister(List.of(REQ_ID), listenerCaptor.getValue());
198     }
199
200     /**
201      * Tests startOperationAsync() when the publisher throws an exception.
202      */
203     @Test
204      void testStartOperationAsyncPubException() {
205         // indicate that nothing was published
206         when(handler.send(any())).thenReturn(false);
207
208         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
209
210         verify(forwarder).register(eq(List.of(REQ_ID)), listenerCaptor.capture());
211
212         // must still unregister
213         verify(forwarder).unregister(List.of(REQ_ID), listenerCaptor.getValue());
214     }
215
216     @Test
217      void testGetTimeoutMsInteger() {
218         // use default
219         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
220         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
221
222         // use provided value
223         assertEquals(5000, oper.getTimeoutMs(5));
224     }
225
226     @Test
227      void testPublishRequest() {
228         assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
229     }
230
231     /**
232      * Tests publishRequest() when nothing is published.
233      */
234     @Test
235      void testPublishRequestUnpublished() {
236         when(handler.send(any())).thenReturn(false);
237         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
238     }
239
240     /**
241      * Tests publishRequest() when the request type is a String.
242      */
243     @Test
244      void testPublishRequestString() {
245         MyStringOperation oper2 = new MyStringOperation(params, config);
246         assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
247     }
248
249     /**
250      * Tests publishRequest() when the coder throws an exception.
251      */
252     @Test
253      void testPublishRequestException() {
254         setOperCoderException();
255         assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
256     }
257
258     /**
259      * Tests processResponse() when it's a success and the response type is a String.
260      */
261     @Test
262      void testProcessResponseSuccessString() {
263         MyStringOperation oper2 = new MyStringOperation(params, config);
264
265         assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
266         assertEquals(OperationResult.SUCCESS, outcome.getResult());
267         assertEquals(TEXT, outcome.getResponse());
268     }
269
270     /**
271      * Tests processResponse() when it's a success and the response type is a
272      * StandardCoderObject.
273      */
274     @Test
275      void testProcessResponseSuccessSco() {
276         MyScoOperation oper2 = new MyScoOperation(params, config);
277
278         assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
279         assertEquals(OperationResult.SUCCESS, outcome.getResult());
280         assertEquals(stdResponse, outcome.getResponse());
281     }
282
283     /**
284      * Tests processResponse() when it's a failure.
285      */
286     @Test
287      void testProcessResponseFailure() throws CoderException {
288         // indicate error in the response
289         MyResponse resp = new MyResponse();
290         resp.setOutput("error");
291
292         responseText = coder.encode(resp);
293         stdResponse = coder.decode(responseText, StandardCoderObject.class);
294
295         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
296         assertEquals(OperationResult.FAILURE, outcome.getResult());
297         assertEquals(resp, outcome.getResponse());
298     }
299
300     /**
301      * Tests processResponse() when the decoder succeeds.
302      */
303     @Test
304      void testProcessResponseDecodeOk() {
305         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
306         assertEquals(OperationResult.SUCCESS, outcome.getResult());
307         assertEquals(response, outcome.getResponse());
308     }
309
310     /**
311      * Tests processResponse() when the decoder throws an exception.
312      */
313     @Test
314      void testProcessResponseDecodeExcept() {
315         assertThatIllegalArgumentException().isThrownBy(
316             () -> oper.processResponse(outcome, "{invalid json", stdResponse));
317     }
318
319     @Test
320      void testPostProcessResponse() {
321         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
322     }
323
324     @Test
325      void testGetCoder() {
326         assertNotNull(oper.getCoder());
327     }
328
329     /**
330      * Creates a new {@link #oper} whose coder will throw an exception.
331      */
332     private void setOperCoderException() {
333         oper = new MyOperation(params, config) {
334             @Override
335             protected Coder getCoder() {
336                 return new StandardCoder() {
337                     @Override
338                     public String encode(Object object, boolean pretty) throws CoderException {
339                         throw new CoderException(EXPECTED_EXCEPTION);
340                     }
341                 };
342             }
343         };
344     }
345
346     @Getter
347     @Setter
348     static class MyRequest {
349         private String theRequestId = REQ_ID;
350         private String input;
351     }
352
353     @Getter
354     @Setter
355     @EqualsAndHashCode
356     static class MyResponse {
357         private String requestId;
358         private String output;
359     }
360
361
362     private static class MyStringOperation extends BidirectionalTopicOperation<String, String> {
363
364         MyStringOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
365             super(params, config, String.class, Collections.emptyList());
366         }
367
368         @Override
369         protected String makeRequest(int attempt) {
370             return TEXT;
371         }
372
373         @Override
374         protected List<String> getExpectedKeyValues(int attempt, String request) {
375             return List.of(REQ_ID);
376         }
377
378         @Override
379         protected Status detmStatus(String rawResponse, String response) {
380             return (response != null ? Status.SUCCESS : Status.FAILURE);
381         }
382     }
383
384
385     private static class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
386         MyScoOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
387             super(params, config, StandardCoderObject.class, Collections.emptyList());
388         }
389
390         @Override
391         protected MyRequest makeRequest(int attempt) {
392             return new MyRequest();
393         }
394
395         @Override
396         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
397             return List.of(REQ_ID);
398         }
399
400         @Override
401         protected Status detmStatus(String rawResponse, StandardCoderObject response) {
402             return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
403         }
404     }
405
406
407     private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
408         MyOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
409             super(params, config, MyResponse.class, Collections.emptyList());
410         }
411
412         @Override
413         protected MyRequest makeRequest(int attempt) {
414             return new MyRequest();
415         }
416
417         @Override
418         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
419             return List.of(REQ_ID);
420         }
421
422         @Override
423         protected Status detmStatus(String rawResponse, MyResponse response) {
424             if (--ntimes <= 0) {
425                 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
426             }
427
428             return Status.STILL_WAITING;
429         }
430     }
431 }