Exception not propagated by processResponse
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperationTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.function.BiConsumer;
41 import lombok.Getter;
42 import lombok.Setter;
43 import org.apache.commons.lang3.tuple.Pair;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
51 import org.onap.policy.common.utils.coder.Coder;
52 import org.onap.policy.common.utils.coder.CoderException;
53 import org.onap.policy.common.utils.coder.StandardCoder;
54 import org.onap.policy.common.utils.coder.StandardCoderObject;
55 import org.onap.policy.common.utils.time.PseudoExecutor;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
59 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
60 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
61 import org.onap.policy.controlloop.policy.PolicyResult;
62
63 public class BidirectionalTopicOperationTest {
64     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
65     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
66     private static final String ACTOR = "my-actor";
67     private static final String OPERATION = "my-operation";
68     private static final String REQ_ID = "my-request-id";
69     private static final String TEXT = "some text";
70     private static final String SUB_REQID = "my-sub-request-id";
71     private static final int TIMEOUT_SEC = 10;
72     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
73     private static final int MAX_REQUESTS = 100;
74
75     private static final StandardCoder coder = new StandardCoder();
76
77     @Mock
78     private BidirectionalTopicConfig config;
79     @Mock
80     private BidirectionalTopicHandler handler;
81     @Mock
82     private Forwarder forwarder;
83
84     @Captor
85     private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
86
87     private ControlLoopOperationParams params;
88     private OperationOutcome outcome;
89     private StandardCoderObject stdResponse;
90     private String responseText;
91     private PseudoExecutor executor;
92     private int ntimes;
93     private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
94
95     /**
96      * Sets up.
97      */
98     @Before
99     public void setUp() throws CoderException {
100         MockitoAnnotations.initMocks(this);
101
102         when(config.getTopicHandler()).thenReturn(handler);
103         when(config.getForwarder()).thenReturn(forwarder);
104         when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
105
106         when(handler.send(any())).thenReturn(true);
107         when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
108
109         executor = new PseudoExecutor();
110
111         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
112         outcome = params.makeOutcome();
113
114         responseText = coder.encode(new MyResponse());
115         stdResponse = coder.decode(responseText, StandardCoderObject.class);
116
117         ntimes = 1;
118
119         oper = new MyOperation();
120     }
121
122     @Test
123     public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
124         assertEquals(ACTOR, oper.getActorName());
125         assertEquals(OPERATION, oper.getName());
126         assertSame(handler, oper.getTopicHandler());
127         assertSame(forwarder, oper.getForwarder());
128         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
129         assertSame(MyResponse.class, oper.getResponseClass());
130     }
131
132     @Test
133     public void testStartOperationAsync() throws Exception {
134
135         // tell it to expect three responses
136         ntimes = 3;
137
138         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
139         assertFalse(future.isDone());
140
141         assertEquals(SUB_REQID, outcome.getSubRequestId());
142
143         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
144
145         verify(forwarder, never()).unregister(any(), any());
146
147         verify(handler).send(any());
148
149         // provide first response
150         listenerCaptor.getValue().accept(responseText, stdResponse);
151         assertTrue(executor.runAll(MAX_REQUESTS));
152         assertFalse(future.isDone());
153
154         // provide second response
155         listenerCaptor.getValue().accept(responseText, stdResponse);
156         assertTrue(executor.runAll(MAX_REQUESTS));
157         assertFalse(future.isDone());
158
159         // provide final response
160         listenerCaptor.getValue().accept(responseText, stdResponse);
161         assertTrue(executor.runAll(MAX_REQUESTS));
162         assertTrue(future.isDone());
163
164         assertSame(outcome, future.get());
165         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
166
167         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
168     }
169
170     /**
171      * Tests startOperationAsync() when processResponse() throws an exception.
172      */
173     @Test
174     public void testStartOperationAsyncProcException() throws Exception {
175         oper = new MyOperation() {
176             @Override
177             protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
178                             StandardCoderObject scoResponse) {
179                 throw EXPECTED_EXCEPTION;
180             }
181         };
182
183         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
184         assertFalse(future.isDone());
185
186         assertEquals(SUB_REQID, outcome.getSubRequestId());
187
188         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
189
190         verify(forwarder, never()).unregister(any(), any());
191
192         // provide a response
193         listenerCaptor.getValue().accept(responseText, stdResponse);
194         assertTrue(executor.runAll(MAX_REQUESTS));
195         assertTrue(future.isCompletedExceptionally());
196
197         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
198     }
199
200     /**
201      * Tests startOperationAsync() when the publisher throws an exception.
202      */
203     @Test
204     public void testStartOperationAsyncPubException() throws Exception {
205         // indicate that nothing was published
206         when(handler.send(any())).thenReturn(false);
207
208         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
209
210         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
211
212         // must still unregister
213         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
214     }
215
216     @Test
217     public void testGetTimeoutMsInteger() {
218         // use default
219         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
220         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
221
222         // use provided value
223         assertEquals(5000, oper.getTimeoutMs(5));
224     }
225
226     @Test
227     public void testPublishRequest() {
228         assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
229     }
230
231     /**
232      * Tests publishRequest() when nothing is published.
233      */
234     @Test
235     public void testPublishRequestUnpublished() {
236         when(handler.send(any())).thenReturn(false);
237         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
238     }
239
240     /**
241      * Tests publishRequest() when the request type is a String.
242      */
243     @Test
244     public void testPublishRequestString() {
245         MyStringOperation oper2 = new MyStringOperation();
246         assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
247     }
248
249     /**
250      * Tests publishRequest() when the coder throws an exception.
251      */
252     @Test
253     public void testPublishRequestException() {
254         setOperCoderException();
255         assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
256     }
257
258     /**
259      * Tests processResponse() when it's a success and the response type is a String.
260      */
261     @Test
262     public void testProcessResponseSuccessString() {
263         MyStringOperation oper2 = new MyStringOperation();
264
265         assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
266         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
267     }
268
269     /**
270      * Tests processResponse() when it's a success and the response type is a
271      * StandardCoderObject.
272      */
273     @Test
274     public void testProcessResponseSuccessSco() {
275         MyScoOperation oper2 = new MyScoOperation();
276
277         assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
278         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
279     }
280
281     /**
282      * Tests processResponse() when it's a failure.
283      */
284     @Test
285     public void testProcessResponseFailure() throws CoderException {
286         // indicate error in the response
287         MyResponse resp = new MyResponse();
288         resp.setOutput("error");
289
290         responseText = coder.encode(resp);
291         stdResponse = coder.decode(responseText, StandardCoderObject.class);
292
293         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
294         assertEquals(PolicyResult.FAILURE, outcome.getResult());
295     }
296
297     /**
298      * Tests processResponse() when the decoder succeeds.
299      */
300     @Test
301     public void testProcessResponseDecodeOk() throws CoderException {
302         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
303         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
304     }
305
306     /**
307      * Tests processResponse() when the decoder throws an exception.
308      */
309     @Test
310     public void testProcessResponseDecodeExcept() throws CoderException {
311         // @formatter:off
312         assertThatIllegalArgumentException().isThrownBy(
313             () -> oper.processResponse(outcome, "{invalid json", stdResponse));
314         // @formatter:on
315     }
316
317     @Test
318     public void testPostProcessResponse() {
319         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
320     }
321
322     @Test
323     public void testMakeCoder() {
324         assertNotNull(oper.makeCoder());
325     }
326
327     /**
328      * Creates a new {@link #oper} whose coder will throw an exception.
329      */
330     private void setOperCoderException() {
331         oper = new MyOperation() {
332             @Override
333             protected Coder makeCoder() {
334                 return new StandardCoder() {
335                     @Override
336                     public String encode(Object object, boolean pretty) throws CoderException {
337                         throw new CoderException(EXPECTED_EXCEPTION);
338                     }
339                 };
340             }
341         };
342     }
343
344     @Getter
345     @Setter
346     public static class MyRequest {
347         private String theRequestId = REQ_ID;
348         private String input;
349     }
350
351     @Getter
352     @Setter
353     public static class MyResponse {
354         private String requestId = REQ_ID;
355         private String output;
356     }
357
358
359     private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
360
361         public MyStringOperation() {
362             super(BidirectionalTopicOperationTest.this.params, config, String.class);
363         }
364
365         @Override
366         protected Pair<String, String> makeRequest(int attempt) {
367             return Pair.of(SUB_REQID, TEXT);
368         }
369
370         @Override
371         protected List<String> getExpectedKeyValues(int attempt, String request) {
372             return Arrays.asList(REQ_ID);
373         }
374
375         @Override
376         protected Status detmStatus(String rawResponse, String response) {
377             return (response != null ? Status.SUCCESS : Status.FAILURE);
378         }
379     }
380
381
382     private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
383         public MyScoOperation() {
384             super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class);
385         }
386
387         @Override
388         protected Pair<String, MyRequest> makeRequest(int attempt) {
389             return Pair.of(SUB_REQID, new MyRequest());
390         }
391
392         @Override
393         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
394             return Arrays.asList(REQ_ID);
395         }
396
397         @Override
398         protected Status detmStatus(String rawResponse, StandardCoderObject response) {
399             return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
400         }
401     }
402
403
404     private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
405         public MyOperation() {
406             super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class);
407         }
408
409         @Override
410         protected Pair<String, MyRequest> makeRequest(int attempt) {
411             return Pair.of(SUB_REQID, new MyRequest());
412         }
413
414         @Override
415         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
416             return Arrays.asList(REQ_ID);
417         }
418
419         @Override
420         protected Status detmStatus(String rawResponse, MyResponse response) {
421             if (--ntimes <= 0) {
422                 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
423             }
424
425             return Status.STILL_WAITING;
426         }
427     }
428 }