package org.onap.policy.controlloop.actorserviceprovider.impl;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.Logger;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Getter;
import lombok.Setter;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
import org.onap.policy.controlloop.policy.PolicyResult;
-import org.slf4j.LoggerFactory;
-public class TopicPairOperationTest {
- private static final List<CommInfrastructure> INFRA_LIST =
- Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB);
+public class BidirectionalTopicOperationTest {
+ private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
private static final String REQ_ID = "my-request-id";
+ private static final String MY_SINK = "my-sink";
private static final String MY_SOURCE = "my-source";
- private static final String MY_TARGET = "my-target";
private static final String TEXT = "some text";
private static final int TIMEOUT_SEC = 10;
private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
+ private static final int MAX_REQUESTS = 100;
private static final StandardCoder coder = new StandardCoder();
- /**
- * Used to attach an appender to the class' logger.
- */
- private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class);
- private static final ExtractAppender appender = new ExtractAppender();
-
@Mock
- private TopicPairOperator operator;
+ private BidirectionalTopicOperator operator;
@Mock
- private TopicPair pair;
+ private BidirectionalTopicHandler handler;
@Mock
private Forwarder forwarder;
@Captor
- private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor;
+ private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
private ControlLoopOperationParams params;
- private TopicPairParams topicParams;
+ private BidirectionalTopicParams topicParams;
private OperationOutcome outcome;
private StandardCoderObject stdResponse;
private String responseText;
- private MyExec executor;
- private TopicPairOperation<MyRequest, MyResponse> oper;
-
- /**
- * Attaches the appender to the logger.
- */
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- /**
- * Attach appender to the logger.
- */
- appender.setContext(logger.getLoggerContext());
- appender.start();
-
- logger.addAppender(appender);
- }
-
- /**
- * Stops the appender.
- */
- @AfterClass
- public static void tearDownAfterClass() {
- appender.stop();
- }
+ private PseudoExecutor executor;
+ private int ntimes;
+ private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
/**
* Sets up.
public void setUp() throws CoderException {
MockitoAnnotations.initMocks(this);
- appender.clearExtractions();
-
- topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+ topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
+ .timeoutSec(TIMEOUT_SEC).build();
when(operator.getActorName()).thenReturn(ACTOR);
when(operator.getName()).thenReturn(OPERATION);
- when(operator.getTopicPair()).thenReturn(pair);
+ when(operator.getTopicHandler()).thenReturn(handler);
when(operator.getForwarder()).thenReturn(forwarder);
when(operator.getParams()).thenReturn(topicParams);
when(operator.isAlive()).thenReturn(true);
- when(pair.publish(any())).thenReturn(INFRA_LIST);
+ when(handler.send(any())).thenReturn(true);
+ when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
- executor = new MyExec(100);
+ executor = new PseudoExecutor();
params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
outcome = params.makeOutcome();
responseText = coder.encode(new MyResponse());
stdResponse = coder.decode(responseText, StandardCoderObject.class);
+ ntimes = 1;
+
oper = new MyOperation();
}
@Test
- public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() {
+ public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
assertEquals(ACTOR, oper.getActorName());
assertEquals(OPERATION, oper.getName());
- assertSame(pair, oper.getTopicPair());
+ assertSame(handler, oper.getTopicHandler());
assertSame(forwarder, oper.getForwarder());
- assertSame(topicParams, oper.getPairParams());
+ assertSame(topicParams, oper.getTopicParams());
assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
assertSame(MyResponse.class, oper.getResponseClass());
}
@Test
public void testStartOperationAsync() throws Exception {
+
+ // tell it to expect three responses
+ ntimes = 3;
+
CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
assertFalse(future.isDone());
verify(forwarder, never()).unregister(any(), any());
- verify(pair).publish(any());
+ verify(handler).send(any());
- // provide the response
- listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse);
+ // provide first response
+ listenerCaptor.getValue().accept(responseText, stdResponse);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(future.isDone());
- // run the tasks
- assertTrue(executor.runAll());
+ // provide second response
+ listenerCaptor.getValue().accept(responseText, stdResponse);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(future.isDone());
+ // provide final response
+ listenerCaptor.getValue().accept(responseText, stdResponse);
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(future.isDone());
- assertSame(outcome, future.get(5, TimeUnit.SECONDS));
+ assertSame(outcome, future.get());
assertEquals(PolicyResult.SUCCESS, outcome.getResult());
verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
@Test
public void testStartOperationAsyncException() throws Exception {
// indicate that nothing was published
- when(pair.publish(any())).thenReturn(Arrays.asList());
+ when(handler.send(any())).thenReturn(false);
assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
@Test
public void testPublishRequest() {
- oper.publishRequest(new MyRequest());
- assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+ assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
}
/**
*/
@Test
public void testPublishRequestUnpublished() {
- when(pair.publish(any())).thenReturn(Arrays.asList());
+ when(handler.send(any())).thenReturn(false);
assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
}
@Test
public void testPublishRequestString() {
MyStringOperation oper2 = new MyStringOperation();
- oper2.publishRequest(TEXT);
- assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+ assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
}
/**
public void testProcessResponseSuccessString() {
MyStringOperation oper2 = new MyStringOperation();
- assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null));
+ assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
assertEquals(PolicyResult.SUCCESS, outcome.getResult());
}
public void testProcessResponseSuccessSco() {
MyScoOperation oper2 = new MyScoOperation();
- assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+ assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
assertEquals(PolicyResult.SUCCESS, outcome.getResult());
}
responseText = coder.encode(resp);
stdResponse = coder.decode(responseText, StandardCoderObject.class);
- assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+ assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
assertEquals(PolicyResult.FAILURE, outcome.getResult());
}
*/
@Test
public void testProcessResponseDecodeOk() throws CoderException {
- assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+ assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
assertEquals(PolicyResult.SUCCESS, outcome.getResult());
}
public void testProcessResponseDecodeExcept() throws CoderException {
// @formatter:off
assertThatIllegalArgumentException().isThrownBy(
- () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse));
+ () -> oper.processResponse(outcome, "{invalid json", stdResponse));
// @formatter:on
}
assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
}
- @Test
- public void testLogTopicRequest() {
- // nothing to log
- appender.clearExtractions();
- oper.logTopicRequest(Arrays.asList(), new MyRequest());
- assertEquals(0, appender.getExtracted().size());
-
- // log structured data
- appender.clearExtractions();
- oper.logTopicRequest(INFRA_LIST, new MyRequest());
- List<String> output = appender.getExtracted();
- assertEquals(2, output.size());
-
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
- .contains("{\n \"theRequestId\": \"my-request-id\"\n}");
-
- assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString())
- .contains("{\n \"theRequestId\": \"my-request-id\"\n}");
-
- // log a plain string
- appender.clearExtractions();
- new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT);
- output = appender.getExtracted();
- assertEquals(1, output.size());
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
- // log a null request
- appender.clearExtractions();
- oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null);
- output = appender.getExtracted();
- assertEquals(1, output.size());
-
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
- // exception from coder
- setOperCoderException();
-
- appender.clearExtractions();
- oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest());
- output = appender.getExtracted();
- assertEquals(2, output.size());
- assertThat(output.get(0)).contains("cannot pretty-print request");
- assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
- }
-
- @Test
- public void testLogTopicResponse() {
- // log structured data
- appender.clearExtractions();
- oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
- List<String> output = appender.getExtracted();
- assertEquals(1, output.size());
-
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
- .contains("{\n \"requestId\": \"my-request-id\"\n}");
-
- // log a plain string
- appender.clearExtractions();
- new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT);
- output = appender.getExtracted();
- assertEquals(1, output.size());
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
- // log a null response
- appender.clearExtractions();
- oper.logTopicResponse(CommInfrastructure.NOOP, null);
- output = appender.getExtracted();
- assertEquals(1, output.size());
-
- assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
- // exception from coder
- setOperCoderException();
-
- appender.clearExtractions();
- oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
- output = appender.getExtracted();
- assertEquals(2, output.size());
- assertThat(output.get(0)).contains("cannot pretty-print response");
- assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
- }
-
@Test
public void testMakeCoder() {
assertNotNull(oper.makeCoder());
}
- private class MyStringOperation extends TopicPairOperation<String, String> {
+ private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
public MyStringOperation() {
- super(TopicPairOperationTest.this.params, operator, String.class);
+ super(BidirectionalTopicOperationTest.this.params, operator, String.class);
}
@Override
}
@Override
- protected boolean isSuccess(String rawResponse, String response) {
- return (response != null);
+ protected Status detmStatus(String rawResponse, String response) {
+ return (response != null ? Status.SUCCESS : Status.FAILURE);
}
}
- private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> {
+ private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
public MyScoOperation() {
- super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class);
+ super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
}
@Override
}
@Override
- protected boolean isSuccess(String rawResponse, StandardCoderObject response) {
- return (response.getString("output") == null);
+ protected Status detmStatus(String rawResponse, StandardCoderObject response) {
+ return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
}
}
- private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> {
+ private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
public MyOperation() {
- super(TopicPairOperationTest.this.params, operator, MyResponse.class);
+ super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
}
@Override
}
@Override
- protected boolean isSuccess(String rawResponse, MyResponse response) {
- return (response.getOutput() == null);
+ protected Status detmStatus(String rawResponse, MyResponse response) {
+ if (--ntimes <= 0) {
+ return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
+ }
+
+ return Status.STILL_WAITING;
}
}
}