2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.ArgumentMatchers.eq;
33 import static org.mockito.Mockito.never;
34 import static org.mockito.Mockito.verify;
35 import static org.mockito.Mockito.when;
37 import java.util.Arrays;
38 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.function.BiConsumer;
43 import org.junit.Before;
44 import org.junit.Test;
45 import org.mockito.ArgumentCaptor;
46 import org.mockito.Captor;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
49 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
50 import org.onap.policy.common.utils.coder.Coder;
51 import org.onap.policy.common.utils.coder.CoderException;
52 import org.onap.policy.common.utils.coder.StandardCoder;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.common.utils.time.PseudoExecutor;
55 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
56 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
57 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
58 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
59 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
60 import org.onap.policy.controlloop.policy.PolicyResult;
62 public class BidirectionalTopicOperationTest {
63 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
64 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
65 private static final String ACTOR = "my-actor";
66 private static final String OPERATION = "my-operation";
67 private static final String REQ_ID = "my-request-id";
68 private static final String MY_SINK = "my-sink";
69 private static final String MY_SOURCE = "my-source";
70 private static final String TEXT = "some text";
71 private static final int TIMEOUT_SEC = 10;
72 private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
73 private static final int MAX_REQUESTS = 100;
75 private static final StandardCoder coder = new StandardCoder();
78 private BidirectionalTopicOperator operator;
80 private BidirectionalTopicHandler handler;
82 private Forwarder forwarder;
85 private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
87 private ControlLoopOperationParams params;
88 private BidirectionalTopicParams topicParams;
89 private OperationOutcome outcome;
90 private StandardCoderObject stdResponse;
91 private String responseText;
92 private PseudoExecutor executor;
94 private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
100 public void setUp() throws CoderException {
101 MockitoAnnotations.initMocks(this);
103 topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
104 .timeoutSec(TIMEOUT_SEC).build();
106 when(operator.getActorName()).thenReturn(ACTOR);
107 when(operator.getName()).thenReturn(OPERATION);
108 when(operator.getTopicHandler()).thenReturn(handler);
109 when(operator.getForwarder()).thenReturn(forwarder);
110 when(operator.getParams()).thenReturn(topicParams);
111 when(operator.isAlive()).thenReturn(true);
113 when(handler.send(any())).thenReturn(true);
114 when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
116 executor = new PseudoExecutor();
118 params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
119 outcome = params.makeOutcome();
121 responseText = coder.encode(new MyResponse());
122 stdResponse = coder.decode(responseText, StandardCoderObject.class);
126 oper = new MyOperation();
130 public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
131 assertEquals(ACTOR, oper.getActorName());
132 assertEquals(OPERATION, oper.getName());
133 assertSame(handler, oper.getTopicHandler());
134 assertSame(forwarder, oper.getForwarder());
135 assertSame(topicParams, oper.getTopicParams());
136 assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
137 assertSame(MyResponse.class, oper.getResponseClass());
141 public void testStartOperationAsync() throws Exception {
143 // tell it to expect three responses
146 CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
147 assertFalse(future.isDone());
149 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
151 verify(forwarder, never()).unregister(any(), any());
153 verify(handler).send(any());
155 // provide first response
156 listenerCaptor.getValue().accept(responseText, stdResponse);
157 assertTrue(executor.runAll(MAX_REQUESTS));
158 assertFalse(future.isDone());
160 // provide second response
161 listenerCaptor.getValue().accept(responseText, stdResponse);
162 assertTrue(executor.runAll(MAX_REQUESTS));
163 assertFalse(future.isDone());
165 // provide final response
166 listenerCaptor.getValue().accept(responseText, stdResponse);
167 assertTrue(executor.runAll(MAX_REQUESTS));
168 assertTrue(future.isDone());
170 assertSame(outcome, future.get());
171 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
173 verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
177 * Tests startOperationAsync() when the publisher throws an exception.
180 public void testStartOperationAsyncException() throws Exception {
181 // indicate that nothing was published
182 when(handler.send(any())).thenReturn(false);
184 assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
186 verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
188 // must still unregister
189 verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
193 public void testGetTimeoutMsInteger() {
195 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(null));
196 assertEquals(TIMEOUT_MS, oper.getTimeoutMs(0));
198 // use provided value
199 assertEquals(5000, oper.getTimeoutMs(5));
203 public void testPublishRequest() {
204 assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
208 * Tests publishRequest() when nothing is published.
211 public void testPublishRequestUnpublished() {
212 when(handler.send(any())).thenReturn(false);
213 assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
217 * Tests publishRequest() when the request type is a String.
220 public void testPublishRequestString() {
221 MyStringOperation oper2 = new MyStringOperation();
222 assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
226 * Tests publishRequest() when the coder throws an exception.
229 public void testPublishRequestException() {
230 setOperCoderException();
231 assertThatIllegalArgumentException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
235 * Tests processResponse() when it's a success and the response type is a String.
238 public void testProcessResponseSuccessString() {
239 MyStringOperation oper2 = new MyStringOperation();
241 assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
242 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
246 * Tests processResponse() when it's a success and the response type is a
247 * StandardCoderObject.
250 public void testProcessResponseSuccessSco() {
251 MyScoOperation oper2 = new MyScoOperation();
253 assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
254 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
258 * Tests processResponse() when it's a failure.
261 public void testProcessResponseFailure() throws CoderException {
262 // indicate error in the response
263 MyResponse resp = new MyResponse();
264 resp.setOutput("error");
266 responseText = coder.encode(resp);
267 stdResponse = coder.decode(responseText, StandardCoderObject.class);
269 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
270 assertEquals(PolicyResult.FAILURE, outcome.getResult());
274 * Tests processResponse() when the decoder succeeds.
277 public void testProcessResponseDecodeOk() throws CoderException {
278 assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
279 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
283 * Tests processResponse() when the decoder throws an exception.
286 public void testProcessResponseDecodeExcept() throws CoderException {
288 assertThatIllegalArgumentException().isThrownBy(
289 () -> oper.processResponse(outcome, "{invalid json", stdResponse));
294 public void testPostProcessResponse() {
295 assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
299 public void testMakeCoder() {
300 assertNotNull(oper.makeCoder());
304 * Creates a new {@link #oper} whose coder will throw an exception.
306 private void setOperCoderException() {
307 oper = new MyOperation() {
309 protected Coder makeCoder() {
310 return new StandardCoder() {
312 public String encode(Object object, boolean pretty) throws CoderException {
313 throw new CoderException(EXPECTED_EXCEPTION);
322 public static class MyRequest {
323 private String theRequestId = REQ_ID;
324 private String input;
329 public static class MyResponse {
330 private String requestId = REQ_ID;
331 private String output;
335 private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
336 public MyStringOperation() {
337 super(BidirectionalTopicOperationTest.this.params, operator, String.class);
341 protected String makeRequest(int attempt) {
346 protected List<String> getExpectedKeyValues(int attempt, String request) {
347 return Arrays.asList(REQ_ID);
351 protected Status detmStatus(String rawResponse, String response) {
352 return (response != null ? Status.SUCCESS : Status.FAILURE);
357 private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
358 public MyScoOperation() {
359 super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
363 protected MyRequest makeRequest(int attempt) {
364 return new MyRequest();
368 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
369 return Arrays.asList(REQ_ID);
373 protected Status detmStatus(String rawResponse, StandardCoderObject response) {
374 return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
379 private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
380 public MyOperation() {
381 super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
385 protected MyRequest makeRequest(int attempt) {
386 return new MyRequest();
390 protected List<String> getExpectedKeyValues(int attempt, MyRequest request) {
391 return Arrays.asList(REQ_ID);
395 protected Status detmStatus(String rawResponse, MyResponse response) {
397 return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
400 return Status.STILL_WAITING;