2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.function.BiConsumer;
42 import lombok.EqualsAndHashCode;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Captor;
50 import org.mockito.Mock;
51 import org.mockito.junit.MockitoJUnitRunner;
52 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
53 import org.onap.policy.common.utils.coder.Coder;
54 import org.onap.policy.common.utils.coder.CoderException;
55 import org.onap.policy.common.utils.coder.StandardCoder;
56 import org.onap.policy.common.utils.coder.StandardCoderObject;
57 import org.onap.policy.common.utils.time.PseudoExecutor;
58 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
59 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
60 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
61 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
62 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
63 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
65 @RunWith(MockitoJUnitRunner.class)
66 public class BidirectionalTopicOperationTest {
67 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
68 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
69 private static final String ACTOR = "my-actor";
70 private static final String OPERATION = "my-operation";
71 private static final String REQ_ID = "my-request-id";
72 private static final String TEXT = "some text";
73 private static final int TIMEOUT_SEC = 10;
74 private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
75 private static final int MAX_REQUESTS = 100;
77 private static final StandardCoder coder = new StandardCoder();
80 private BidirectionalTopicConfig config;
82 private BidirectionalTopicHandler handler;
84 private Forwarder forwarder;
87 private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
89 private ControlLoopOperationParams params;
90 private OperationOutcome outcome;
91 private StandardCoderObject stdResponse;
92 private MyResponse response;
93 private String responseText;
94 private PseudoExecutor executor;
96 private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
102 public void setUp() throws CoderException {
103 when(config.getTopicHandler()).thenReturn(handler);
104 when(config.getForwarder()).thenReturn(forwarder);
105 when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
107 when(handler.send(any())).thenReturn(true);
108 when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
110 executor = new PseudoExecutor();
112 params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
113 outcome = params.makeOutcome();
115 response = new MyResponse();
116 response.setRequestId(REQ_ID);
117 responseText = coder.encode(response);
118 stdResponse = coder.decode(responseText, StandardCoderObject.class);
122 oper = new MyOperation();
126 public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
127 assertEquals(ACTOR, oper.getActorName());
128 assertEquals(OPERATION, oper.getName());
129 assertSame(handler, oper.getTopicHandler());
130 assertSame(forwarder, oper.getForwarder());
131 assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
132 assertSame(MyResponse.class, oper.getResponseClass());
136 public void testStartOperationAsync() throws Exception {
138 // tell it to expect three responses
141 CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
142 assertFalse(future.isDone());
144 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
146 verify(forwarder, never()).unregister(any(), any());
148 verify(handler).send(any());
150 // provide first response
151 listenerCaptor.getValue().accept(responseText, stdResponse);
152 assertTrue(executor.runAll(MAX_REQUESTS));
153 assertFalse(future.isDone());
155 // provide second response
156 listenerCaptor.getValue().accept(responseText, stdResponse);
157 assertTrue(executor.runAll(MAX_REQUESTS));
158 assertFalse(future.isDone());
160 // provide final response
161 listenerCaptor.getValue().accept(responseText, stdResponse);
162 assertTrue(executor.runAll(MAX_REQUESTS));
163 assertTrue(future.isDone());
165 assertSame(outcome, future.get());
166 assertEquals(OperationResult.SUCCESS, outcome.getResult());
167 assertEquals(response, outcome.getResponse());
169 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
173 * Tests startOperationAsync() when processResponse() throws an exception.
176 public void testStartOperationAsyncProcException() throws Exception {
177 oper = new MyOperation() {
179 protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
180 StandardCoderObject scoResponse) {
181 throw EXPECTED_EXCEPTION;
185 CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
186 assertFalse(future.isDone());
188 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
190 verify(forwarder, never()).unregister(any(), any());
192 // provide a response
193 listenerCaptor.getValue().accept(responseText, stdResponse);
194 assertTrue(executor.runAll(MAX_REQUESTS));
195 assertTrue(future.isCompletedExceptionally());
197 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
201 * Tests startOperationAsync() when the publisher throws an exception.
204 public void testStartOperationAsyncPubException() throws Exception {
205 // indicate that nothing was published
206 when(handler.send(any())).thenReturn(false);
208 assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
210 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
212 // must still unregister
213 verify(forwarder).unregister(Arrays.asList(REQ_ID), listenerCaptor.getValue());
217 public void testGetTimeoutMsInteger() {
219 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
220 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
222 // use provided value
223 assertEquals(5000, oper.getTimeoutMs(5));
227 public void testPublishRequest() {
228 assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
232 * Tests publishRequest() when nothing is published.
235 public void testPublishRequestUnpublished() {
236 when(handler.send(any())).thenReturn(false);
237 assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
241 * Tests publishRequest() when the request type is a String.
244 public void testPublishRequestString() {
245 MyStringOperation oper2 = new MyStringOperation();
246 assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
250 * Tests publishRequest() when the coder throws an exception.
253 public void testPublishRequestException() {
254 setOperCoderException();
255 assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
259 * Tests processResponse() when it's a success and the response type is a String.
262 public void testProcessResponseSuccessString() {
263 MyStringOperation oper2 = new MyStringOperation();
265 assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
266 assertEquals(OperationResult.SUCCESS, outcome.getResult());
267 assertEquals(TEXT, outcome.getResponse());
271 * Tests processResponse() when it's a success and the response type is a
272 * StandardCoderObject.
275 public void testProcessResponseSuccessSco() {
276 MyScoOperation oper2 = new MyScoOperation();
278 assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
279 assertEquals(OperationResult.SUCCESS, outcome.getResult());
280 assertEquals(stdResponse, outcome.getResponse());
284 * Tests processResponse() when it's a failure.
287 public void testProcessResponseFailure() throws CoderException {
288 // indicate error in the response
289 MyResponse resp = new MyResponse();
290 resp.setOutput("error");
292 responseText = coder.encode(resp);
293 stdResponse = coder.decode(responseText, StandardCoderObject.class);
295 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
296 assertEquals(OperationResult.FAILURE, outcome.getResult());
297 assertEquals(resp, outcome.getResponse());
301 * Tests processResponse() when the decoder succeeds.
304 public void testProcessResponseDecodeOk() throws CoderException {
305 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
306 assertEquals(OperationResult.SUCCESS, outcome.getResult());
307 assertEquals(response, outcome.getResponse());
311 * Tests processResponse() when the decoder throws an exception.
314 public void testProcessResponseDecodeExcept() throws CoderException {
316 assertThatIllegalArgumentException().isThrownBy(
317 () -> oper.processResponse(outcome, "{invalid json", stdResponse));
322 public void testPostProcessResponse() {
323 assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
327 public void testGetCoder() {
328 assertNotNull(oper.getCoder());
332 * Creates a new {@link #oper} whose coder will throw an exception.
334 private void setOperCoderException() {
335 oper = new MyOperation() {
337 protected Coder getCoder() {
338 return new StandardCoder() {
340 public String encode(Object object, boolean pretty) throws CoderException {
341 throw new CoderException(EXPECTED_EXCEPTION);
350 public static class MyRequest {
351 private String theRequestId = REQ_ID;
352 private String input;
358 public static class MyResponse {
359 private String requestId;
360 private String output;
364 private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
366 public MyStringOperation() {
367 super(BidirectionalTopicOperationTest.this.params, config, String.class, Collections.emptyList());
371 protected String makeRequest(int attempt) {
376 protected List<String> getExpectedKeyValues(int attempt, String request) {
377 return Arrays.asList(REQ_ID);
381 protected Status detmStatus(String rawResponse, String response) {
382 return (response != null ? Status.SUCCESS : Status.FAILURE);
387 private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
388 public MyScoOperation() {
389 super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class,
390 Collections.emptyList());
394 protected MyRequest makeRequest(int attempt) {
395 return new MyRequest();
399 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
400 return Arrays.asList(REQ_ID);
404 protected Status detmStatus(String rawResponse, StandardCoderObject response) {
405 return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
410 private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
411 public MyOperation() {
412 super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class, Collections.emptyList());
416 protected MyRequest makeRequest(int attempt) {
417 return new MyRequest();
421 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
422 return Arrays.asList(REQ_ID);
426 protected Status detmStatus(String rawResponse, MyResponse response) {
428 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
431 return Status.STILL_WAITING;