2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023, 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.actorserviceprovider.impl;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
27 import static org.junit.jupiter.api.Assertions.assertEquals;
28 import static org.junit.jupiter.api.Assertions.assertFalse;
29 import static org.junit.jupiter.api.Assertions.assertNotNull;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32 import static org.mockito.ArgumentMatchers.any;
33 import static org.mockito.ArgumentMatchers.eq;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.when;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.function.BiConsumer;
43 import lombok.EqualsAndHashCode;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.junit.jupiter.api.extension.ExtendWith;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.Captor;
51 import org.mockito.Mock;
52 import org.mockito.Mockito;
53 import org.mockito.junit.jupiter.MockitoExtension;
54 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
55 import org.onap.policy.common.utils.coder.Coder;
56 import org.onap.policy.common.utils.coder.CoderException;
57 import org.onap.policy.common.utils.coder.StandardCoder;
58 import org.onap.policy.common.utils.coder.StandardCoderObject;
59 import org.onap.policy.common.utils.time.PseudoExecutor;
60 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
61 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
62 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
63 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
64 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
65 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
67 @ExtendWith(MockitoExtension.class)
68 class BidirectionalTopicOperationTest {
69 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
70 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
71 private static final String ACTOR = "my-actor";
72 private static final String OPERATION = "my-operation";
73 private static final String REQ_ID = "my-request-id";
74 private static final String TEXT = "some text";
75 private static final int TIMEOUT_SEC = 10;
76 private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
77 private static final int MAX_REQUESTS = 100;
79 private static final StandardCoder coder = new StandardCoder();
82 private BidirectionalTopicConfig config;
84 private BidirectionalTopicHandler handler;
86 private Forwarder forwarder;
89 private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
91 private ControlLoopOperationParams params;
92 private OperationOutcome outcome;
93 private StandardCoderObject stdResponse;
94 private MyResponse response;
95 private String responseText;
96 private PseudoExecutor executor;
98 private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
104 void setUp() throws CoderException {
105 Mockito.lenient().when(config.getTopicHandler()).thenReturn(handler);
106 Mockito.lenient().when(config.getForwarder()).thenReturn(forwarder);
107 Mockito.lenient().when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
109 Mockito.lenient().when(handler.send(any())).thenReturn(true);
110 Mockito.lenient().when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
112 executor = new PseudoExecutor();
114 params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
115 outcome = params.makeOutcome();
117 response = new MyResponse();
118 response.setRequestId(REQ_ID);
119 responseText = coder.encode(response);
120 stdResponse = coder.decode(responseText, StandardCoderObject.class);
124 oper = new MyOperation(params, config);
128 void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
129 assertEquals(ACTOR, oper.getActorName());
130 assertEquals(OPERATION, oper.getName());
131 assertSame(handler, oper.getTopicHandler());
132 assertSame(forwarder, oper.getForwarder());
133 assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
134 assertSame(MyResponse.class, oper.getResponseClass());
138 void testStartOperationAsync() throws Exception {
139 // tell it to expect three responses
142 CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
143 assertFalse(future.isDone());
145 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
147 verify(forwarder, never()).unregister(any(), any());
149 verify(handler).send(any());
151 // provide first response
152 listenerCaptor.getValue().accept(responseText, stdResponse);
153 assertTrue(executor.runAll(MAX_REQUESTS));
154 assertFalse(future.isDone());
156 // provide second response
157 listenerCaptor.getValue().accept(responseText, stdResponse);
158 assertTrue(executor.runAll(MAX_REQUESTS));
159 assertFalse(future.isDone());
161 // provide final response
162 listenerCaptor.getValue().accept(responseText, stdResponse);
163 assertTrue(executor.runAll(MAX_REQUESTS));
164 assertTrue(future.isDone());
166 assertSame(outcome, future.get());
167 assertEquals(OperationResult.SUCCESS, outcome.getResult());
168 assertEquals(response, outcome.getResponse());
170 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
174 * Tests startOperationAsync() when processResponse() throws an exception.
177 void testStartOperationAsyncProcException() throws Exception {
178 oper = new MyOperation(params, config) {
180 protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
181 StandardCoderObject scoResponse) {
182 throw EXPECTED_EXCEPTION;
186 CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
187 assertFalse(future.isDone());
189 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
191 verify(forwarder, never()).unregister(any(), any());
193 // provide a response
194 listenerCaptor.getValue().accept(responseText, stdResponse);
195 assertTrue(executor.runAll(MAX_REQUESTS));
196 assertTrue(future.isCompletedExceptionally());
198 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
202 * Tests startOperationAsync() when the publisher throws an exception.
205 void testStartOperationAsyncPubException() throws Exception {
206 // indicate that nothing was published
207 when(handler.send(any())).thenReturn(false);
209 assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
211 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
213 // must still unregister
214 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
218 void testGetTimeoutMsInteger() {
220 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
221 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
223 // use provided value
224 assertEquals(5000, oper.getTimeoutMs(5));
228 void testPublishRequest() {
229 assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
233 * Tests publishRequest() when nothing is published.
236 void testPublishRequestUnpublished() {
237 when(handler.send(any())).thenReturn(false);
238 assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
242 * Tests publishRequest() when the request type is a String.
245 void testPublishRequestString() {
246 MyStringOperation oper2 = new MyStringOperation(params, config);
247 assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
251 * Tests publishRequest() when the coder throws an exception.
254 void testPublishRequestException() {
255 setOperCoderException();
256 assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
260 * Tests processResponse() when it's a success and the response type is a String.
263 void testProcessResponseSuccessString() {
264 MyStringOperation oper2 = new MyStringOperation(params, config);
266 assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
267 assertEquals(OperationResult.SUCCESS, outcome.getResult());
268 assertEquals(TEXT, outcome.getResponse());
272 * Tests processResponse() when it's a success and the response type is a
273 * StandardCoderObject.
276 void testProcessResponseSuccessSco() {
277 MyScoOperation oper2 = new MyScoOperation(params, config);
279 assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
280 assertEquals(OperationResult.SUCCESS, outcome.getResult());
281 assertEquals(stdResponse, outcome.getResponse());
285 * Tests processResponse() when it's a failure.
288 void testProcessResponseFailure() throws CoderException {
289 // indicate error in the response
290 MyResponse resp = new MyResponse();
291 resp.setOutput("error");
293 responseText = coder.encode(resp);
294 stdResponse = coder.decode(responseText, StandardCoderObject.class);
296 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
297 assertEquals(OperationResult.FAILURE, outcome.getResult());
298 assertEquals(resp, outcome.getResponse());
302 * Tests processResponse() when the decoder succeeds.
305 void testProcessResponseDecodeOk() throws CoderException {
306 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
307 assertEquals(OperationResult.SUCCESS, outcome.getResult());
308 assertEquals(response, outcome.getResponse());
312 * Tests processResponse() when the decoder throws an exception.
315 void testProcessResponseDecodeExcept() throws CoderException {
316 assertThatIllegalArgumentException().isThrownBy(
317 () -> oper.processResponse(outcome, "{invalid json", stdResponse));
321 void testPostProcessResponse() {
322 assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
326 void testGetCoder() {
327 assertNotNull(oper.getCoder());
331 * Creates a new {@link #oper} whose coder will throw an exception.
333 private void setOperCoderException() {
334 oper = new MyOperation(params, config) {
336 protected Coder getCoder() {
337 return new StandardCoder() {
339 public String encode(Object object, boolean pretty) throws CoderException {
340 throw new CoderException(EXPECTED_EXCEPTION);
349 static class MyRequest {
350 private String theRequestId = REQ_ID;
351 private String input;
357 static class MyResponse {
358 private String requestId;
359 private String output;
363 private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
365 MyStringOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
366 super(params, config, String.class, Collections.emptyList());
370 protected String makeRequest(int attempt) {
375 protected List<String> getExpectedKeyValues(int attempt, String request) {
376 return Arrays.asList(REQ_ID);
380 protected Status detmStatus(String rawResponse, String response) {
381 return (response != null ? Status.SUCCESS : Status.FAILURE);
386 private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
387 MyScoOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
388 super(params, config, StandardCoderObject.class, Collections.emptyList());
392 protected MyRequest makeRequest(int attempt) {
393 return new MyRequest();
397 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
398 return Arrays.asList(REQ_ID);
402 protected Status detmStatus(String rawResponse, StandardCoderObject response) {
403 return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
408 private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
409 MyOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config) {
410 super(params, config, MyResponse.class, Collections.emptyList());
414 protected MyRequest makeRequest(int attempt) {
415 return new MyRequest();
419 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
420 return Arrays.asList(REQ_ID);
424 protected Status detmStatus(String rawResponse, MyResponse response) {
426 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
429 return Status.STILL_WAITING;