Merge "Add subrequest ID to OperationOutcome"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperationTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
36
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.function.BiConsumer;
41 import lombok.Getter;
42 import lombok.Setter;
43 import org.apache.commons.lang3.tuple.Pair;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Captor;
48 import org.mockito.Mock;
49 import org.mockito.MockitoAnnotations;
50 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
51 import org.onap.policy.common.utils.coder.Coder;
52 import org.onap.policy.common.utils.coder.CoderException;
53 import org.onap.policy.common.utils.coder.StandardCoder;
54 import org.onap.policy.common.utils.coder.StandardCoderObject;
55 import org.onap.policy.common.utils.time.PseudoExecutor;
56 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
58 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
59 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
60 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
61 import org.onap.policy.controlloop.policy.PolicyResult;
62
63 public class BidirectionalTopicOperationTest {
64     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
65     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
66     private static final String ACTOR = "my-actor";
67     private static final String OPERATION = "my-operation";
68     private static final String REQ_ID = "my-request-id";
69     private static final String TEXT = "some text";
70     private static final String SUB_REQID = "my-sub-request-id";
71     private static final int TIMEOUT_SEC = 10;
72     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
73     private static final int MAX_REQUESTS = 100;
74
75     private static final StandardCoder coder = new StandardCoder();
76
77     @Mock
78     private BidirectionalTopicConfig config;
79     @Mock
80     private BidirectionalTopicHandler handler;
81     @Mock
82     private Forwarder forwarder;
83
84     @Captor
85     private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
86
87     private ControlLoopOperationParams params;
88     private OperationOutcome outcome;
89     private StandardCoderObject stdResponse;
90     private String responseText;
91     private PseudoExecutor executor;
92     private int ntimes;
93     private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
94
95     /**
96      * Sets up.
97      */
98     @Before
99     public void setUp() throws CoderException {
100         MockitoAnnotations.initMocks(this);
101
102         when(config.getTopicHandler()).thenReturn(handler);
103         when(config.getForwarder()).thenReturn(forwarder);
104         when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
105
106         when(handler.send(any())).thenReturn(true);
107         when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
108
109         executor = new PseudoExecutor();
110
111         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
112         outcome = params.makeOutcome();
113
114         responseText = coder.encode(new MyResponse());
115         stdResponse = coder.decode(responseText, StandardCoderObject.class);
116
117         ntimes = 1;
118
119         oper = new MyOperation();
120     }
121
122     @Test
123     public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
124         assertEquals(ACTOR, oper.getActorName());
125         assertEquals(OPERATION, oper.getName());
126         assertSame(handler, oper.getTopicHandler());
127         assertSame(forwarder, oper.getForwarder());
128         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
129         assertSame(MyResponse.class, oper.getResponseClass());
130     }
131
132     @Test
133     public void testStartOperationAsync() throws Exception {
134
135         // tell it to expect three responses
136         ntimes = 3;
137
138         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
139         assertFalse(future.isDone());
140
141         assertEquals(SUB_REQID, outcome.getSubRequestId());
142
143         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
144
145         verify(forwarder, never()).unregister(any(), any());
146
147         verify(handler).send(any());
148
149         // provide first response
150         listenerCaptor.getValue().accept(responseText, stdResponse);
151         assertTrue(executor.runAll(MAX_REQUESTS));
152         assertFalse(future.isDone());
153
154         // provide second response
155         listenerCaptor.getValue().accept(responseText, stdResponse);
156         assertTrue(executor.runAll(MAX_REQUESTS));
157         assertFalse(future.isDone());
158
159         // provide final response
160         listenerCaptor.getValue().accept(responseText, stdResponse);
161         assertTrue(executor.runAll(MAX_REQUESTS));
162         assertTrue(future.isDone());
163
164         assertSame(outcome, future.get());
165         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
166
167         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
168     }
169
170     /**
171      * Tests startOperationAsync() when the publisher throws an exception.
172      */
173     @Test
174     public void testStartOperationAsyncException() throws Exception {
175         // indicate that nothing was published
176         when(handler.send(any())).thenReturn(false);
177
178         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
179
180         verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
181
182         // must still unregister
183         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
184     }
185
186     @Test
187     public void testGetTimeoutMsInteger() {
188         // use default
189         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
190         assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
191
192         // use provided value
193         assertEquals(5000, oper.getTimeoutMs(5));
194     }
195
196     @Test
197     public void testPublishRequest() {
198         assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
199     }
200
201     /**
202      * Tests publishRequest() when nothing is published.
203      */
204     @Test
205     public void testPublishRequestUnpublished() {
206         when(handler.send(any())).thenReturn(false);
207         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
208     }
209
210     /**
211      * Tests publishRequest() when the request type is a String.
212      */
213     @Test
214     public void testPublishRequestString() {
215         MyStringOperation oper2 = new MyStringOperation();
216         assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
217     }
218
219     /**
220      * Tests publishRequest() when the coder throws an exception.
221      */
222     @Test
223     public void testPublishRequestException() {
224         setOperCoderException();
225         assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
226     }
227
228     /**
229      * Tests processResponse() when it's a success and the response type is a String.
230      */
231     @Test
232     public void testProcessResponseSuccessString() {
233         MyStringOperation oper2 = new MyStringOperation();
234
235         assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
236         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
237     }
238
239     /**
240      * Tests processResponse() when it's a success and the response type is a
241      * StandardCoderObject.
242      */
243     @Test
244     public void testProcessResponseSuccessSco() {
245         MyScoOperation oper2 = new MyScoOperation();
246
247         assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
248         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
249     }
250
251     /**
252      * Tests processResponse() when it's a failure.
253      */
254     @Test
255     public void testProcessResponseFailure() throws CoderException {
256         // indicate error in the response
257         MyResponse resp = new MyResponse();
258         resp.setOutput("error");
259
260         responseText = coder.encode(resp);
261         stdResponse = coder.decode(responseText, StandardCoderObject.class);
262
263         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
264         assertEquals(PolicyResult.FAILURE, outcome.getResult());
265     }
266
267     /**
268      * Tests processResponse() when the decoder succeeds.
269      */
270     @Test
271     public void testProcessResponseDecodeOk() throws CoderException {
272         assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
273         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
274     }
275
276     /**
277      * Tests processResponse() when the decoder throws an exception.
278      */
279     @Test
280     public void testProcessResponseDecodeExcept() throws CoderException {
281         // @formatter:off
282         assertThatIllegalArgumentException().isThrownBy(
283             () -> oper.processResponse(outcome, "{invalid json", stdResponse));
284         // @formatter:on
285     }
286
287     @Test
288     public void testPostProcessResponse() {
289         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
290     }
291
292     @Test
293     public void testMakeCoder() {
294         assertNotNull(oper.makeCoder());
295     }
296
297     /**
298      * Creates a new {@link #oper} whose coder will throw an exception.
299      */
300     private void setOperCoderException() {
301         oper = new MyOperation() {
302             @Override
303             protected Coder makeCoder() {
304                 return new StandardCoder() {
305                     @Override
306                     public String encode(Object object, boolean pretty) throws CoderException {
307                         throw new CoderException(EXPECTED_EXCEPTION);
308                     }
309                 };
310             }
311         };
312     }
313
314     @Getter
315     @Setter
316     public static class MyRequest {
317         private String theRequestId = REQ_ID;
318         private String input;
319     }
320
321     @Getter
322     @Setter
323     public static class MyResponse {
324         private String requestId = REQ_ID;
325         private String output;
326     }
327
328
329     private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
330
331         public MyStringOperation() {
332             super(BidirectionalTopicOperationTest.this.params, config, String.class);
333         }
334
335         @Override
336         protected Pair<String, String> makeRequest(int attempt) {
337             return Pair.of(SUB_REQID, TEXT);
338         }
339
340         @Override
341         protected List<String> getExpectedKeyValues(int attempt, String request) {
342             return Arrays.asList(REQ_ID);
343         }
344
345         @Override
346         protected Status detmStatus(String rawResponse, String response) {
347             return (response != null ? Status.SUCCESS : Status.FAILURE);
348         }
349     }
350
351
352     private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
353         public MyScoOperation() {
354             super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class);
355         }
356
357         @Override
358         protected Pair<String, MyRequest> makeRequest(int attempt) {
359             return Pair.of(SUB_REQID, new MyRequest());
360         }
361
362         @Override
363         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
364             return Arrays.asList(REQ_ID);
365         }
366
367         @Override
368         protected Status detmStatus(String rawResponse, StandardCoderObject response) {
369             return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
370         }
371     }
372
373
374     private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
375         public MyOperation() {
376             super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class);
377         }
378
379         @Override
380         protected Pair<String, MyRequest> makeRequest(int attempt) {
381             return Pair.of(SUB_REQID, new MyRequest());
382         }
383
384         @Override
385         protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
386             return Arrays.asList(REQ_ID);
387         }
388
389         @Override
390         protected Status detmStatus(String rawResponse, MyResponse response) {
391             if (--ntimes <= 0) {
392                 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
393             }
394
395             return Status.STILL_WAITING;
396         }
397     }
398 }