Set sub request ID before start callback
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperationTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.function.BiConsumer;
41 import lombok.Getter;
42 import lombok.Setter;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.Captor;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
49 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.common.utils.coder.Coder;
51 import org.onap.policy.common.utils.coder.CoderException;
52 import org.onap.policy.common.utils.coder.StandardCoder;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.common.utils.time.PseudoExecutor;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
59 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
60 import org.onap.policy.controlloop.policy.PolicyResult;
61
62 public class BidirectionalTopicOperationTest {
63     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
64     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
65     private static final String ACTOR = "my-actor";
66     private static final String OPERATION = "my-operation";
67     private static final String REQ_ID = "my-request-id";
68     private static final String TEXT = "some text";
69     private static final int TIMEOUT_SEC = 10;
70     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
71     private static final int MAX_REQUESTS = 100;
72
73     private static final StandardCoder coder = new StandardCoder();
74
75     @Mock
76     private BidirectionalTopicConfig config;
77     @Mock
78     private BidirectionalTopicHandler handler;
79     @Mock
80     private Forwarder forwarder;
81
82     @Captor
83     private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
84
85     private ControlLoopOperationParams params;
86     private OperationOutcome outcome;
87     private StandardCoderObject stdResponse;
88     private String responseText;
89     private PseudoExecutor executor;
90     private int ntimes;
91     private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
92
93     /**
94      * Sets up.
95      */
96     @Before
97     public void setUp() throws CoderException {
98         MockitoAnnotations.initMocks(this);
99
100         when(config.getTopicHandler()).thenReturn(handler);
101         when(config.getForwarder()).thenReturn(forwarder);
102         when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
103
104         when(handler.send(any())).thenReturn(true);
105         when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
106
107         executor = new PseudoExecutor();
108
109         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
110         outcome = params.makeOutcome();
111
112         responseText = coder.encode(new MyResponse());
113         stdResponse = coder.decode(responseText, StandardCoderObject.class);
114
115         ntimes = 1;
116
117         oper = new MyOperation();
118     }
119
120     @Test
121     public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
122         assertEquals(ACTOR, oper.getActorName());
123         assertEquals(OPERATION, oper.getName());
124         assertSame(handler, oper.getTopicHandler());
125         assertSame(forwarder, oper.getForwarder());
126         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
127         assertSame(MyResponse.class, oper.getResponseClass());
128     }
129
130     @Test
131     public void testStartOperationAsync() throws Exception {
132
133         // tell it to expect three responses
134         ntimes = 3;
135
136         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
137         assertFalse(future.isDone());
138
139         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
140
141         verify(forwarder, never()).unregister(any(), any());
142
143         verify(handler).send(any());
144
145         // provide first response
146         listenerCaptor.getValue().accept(responseText, stdResponse);
147         assertTrue(executor.runAll(MAX_REQUESTS));
148         assertFalse(future.isDone());
149
150         // provide second response
151         listenerCaptor.getValue().accept(responseText, stdResponse);
152         assertTrue(executor.runAll(MAX_REQUESTS));
153         assertFalse(future.isDone());
154
155         // provide final response
156         listenerCaptor.getValue().accept(responseText, stdResponse);
157         assertTrue(executor.runAll(MAX_REQUESTS));
158         assertTrue(future.isDone());
159
160         assertSame(outcome, future.get());
161         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
162
163         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
164     }
165
166     /**
167      * Tests startOperationAsync() when processResponse() throws an exception.
168      */
169     @Test
170     public void testStartOperationAsyncProcException() throws Exception {
171         oper = new MyOperation() {
172             @Override
173             protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
174                             StandardCoderObject scoResponse) {
175                 throw EXPECTED_EXCEPTION;
176             }
177         };
178
179         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
180         assertFalse(future.isDone());
181
182         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
183
184         verify(forwarder, never()).unregister(any(), any());
185
186         // provide a response
187         listenerCaptor.getValue().accept(responseText, stdResponse);
188         assertTrue(executor.runAll(MAX_REQUESTS));
189         assertTrue(future.isCompletedExceptionally());
190
191         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
192     }
193
194     /**
195      * Tests startOperationAsync() when the publisher throws an exception.
196      */
197     @Test
198     public void testStartOperationAsyncPubException() throws Exception {
199         // indicate that nothing was published
200         when(handler.send(any())).thenReturn(false);
201
202         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
203
204         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
205
206         // must still unregister
207         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
208     }
209
210     @Test
211     public void testGetTimeoutMsInteger() {
212         // use default
213         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
214         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
215
216         // use provided value
217         assertEquals(5000, oper.getTimeoutMs(5));
218     }
219
220     @Test
221     public void testPublishRequest() {
222         assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
223     }
224
225     /**
226      * Tests publishRequest() when nothing is published.
227      */
228     @Test
229     public void testPublishRequestUnpublished() {
230         when(handler.send(any())).thenReturn(false);
231         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
232     }
233
234     /**
235      * Tests publishRequest() when the request type is a String.
236      */
237     @Test
238     public void testPublishRequestString() {
239         MyStringOperation oper2 = new MyStringOperation();
240         assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
241     }
242
243     /**
244      * Tests publishRequest() when the coder throws an exception.
245      */
246     @Test
247     public void testPublishRequestException() {
248         setOperCoderException();
249         assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
250     }
251
252     /**
253      * Tests processResponse() when it's a success and the response type is a String.
254      */
255     @Test
256     public void testProcessResponseSuccessString() {
257         MyStringOperation oper2 = new MyStringOperation();
258
259         assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
260         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
261     }
262
263     /**
264      * Tests processResponse() when it's a success and the response type is a
265      * StandardCoderObject.
266      */
267     @Test
268     public void testProcessResponseSuccessSco() {
269         MyScoOperation oper2 = new MyScoOperation();
270
271         assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
272         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
273     }
274
275     /**
276      * Tests processResponse() when it's a failure.
277      */
278     @Test
279     public void testProcessResponseFailure() throws CoderException {
280         // indicate error in the response
281         MyResponse resp = new MyResponse();
282         resp.setOutput("error");
283
284         responseText = coder.encode(resp);
285         stdResponse = coder.decode(responseText, StandardCoderObject.class);
286
287         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
288         assertEquals(PolicyResult.FAILURE, outcome.getResult());
289     }
290
291     /**
292      * Tests processResponse() when the decoder succeeds.
293      */
294     @Test
295     public void testProcessResponseDecodeOk() throws CoderException {
296         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
297         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
298     }
299
300     /**
301      * Tests processResponse() when the decoder throws an exception.
302      */
303     @Test
304     public void testProcessResponseDecodeExcept() throws CoderException {
305         // @formatter:off
306         assertThatIllegalArgumentException().isThrownBy(
307             () -> oper.processResponse(outcome, "{invalid json", stdResponse));
308         // @formatter:on
309     }
310
311     @Test
312     public void testPostProcessResponse() {
313         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
314     }
315
316     @Test
317     public void testMakeCoder() {
318         assertNotNull(oper.makeCoder());
319     }
320
321     /**
322      * Creates a new {@link #oper} whose coder will throw an exception.
323      */
324     private void setOperCoderException() {
325         oper = new MyOperation() {
326             @Override
327             protected Coder makeCoder() {
328                 return new StandardCoder() {
329                     @Override
330                     public String encode(Object object, boolean pretty) throws CoderException {
331                         throw new CoderException(EXPECTED_EXCEPTION);
332                     }
333                 };
334             }
335         };
336     }
337
338     @Getter
339     @Setter
340     public static class MyRequest {
341         private String theRequestId = REQ_ID;
342         private String input;
343     }
344
345     @Getter
346     @Setter
347     public static class MyResponse {
348         private String requestId = REQ_ID;
349         private String output;
350     }
351
352
353     private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
354
355         public MyStringOperation() {
356             super(BidirectionalTopicOperationTest.this.params, config, String.class);
357         }
358
359         @Override
360         protected String makeRequest(int attempt) {
361             return TEXT;
362         }
363
364         @Override
365         protected List<String> getExpectedKeyValues(int attempt, String request) {
366             return Arrays.asList(REQ_ID);
367         }
368
369         @Override
370         protected Status detmStatus(String rawResponse, String response) {
371             return (response != null ? Status.SUCCESS : Status.FAILURE);
372         }
373     }
374
375
376     private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
377         public MyScoOperation() {
378             super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class);
379         }
380
381         @Override
382         protected MyRequest makeRequest(int attempt) {
383             return new MyRequest();
384         }
385
386         @Override
387         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
388             return Arrays.asList(REQ_ID);
389         }
390
391         @Override
392         protected Status detmStatus(String rawResponse, StandardCoderObject response) {
393             return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
394         }
395     }
396
397
398     private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
399         public MyOperation() {
400             super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class);
401         }
402
403         @Override
404         protected MyRequest makeRequest(int attempt) {
405             return new MyRequest();
406         }
407
408         @Override
409         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
410             return Arrays.asList(REQ_ID);
411         }
412
413         @Override
414         protected Status detmStatus(String rawResponse, MyResponse response) {
415             if (--ntimes <= 0) {
416                 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
417             }
418
419             return Status.STILL_WAITING;
420         }
421     }
422 }