import static org.mockito.Mockito.when;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.junit.Before;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.common.utils.time.PseudoExecutor;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.policy.PolicyResult;
public class BidirectionalTopicOperationTest {
private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
private static final String REQ_ID = "my-request-id";
- private static final String MY_SINK = "my-sink";
- private static final String MY_SOURCE = "my-source";
private static final String TEXT = "some text";
private static final int TIMEOUT_SEC = 10;
private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
private static final StandardCoder coder = new StandardCoder();
@Mock
- private BidirectionalTopicOperator operator;
+ private BidirectionalTopicConfig config;
@Mock
private BidirectionalTopicHandler handler;
@Mock
private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
private ControlLoopOperationParams params;
- private BidirectionalTopicParams topicParams;
private OperationOutcome outcome;
private StandardCoderObject stdResponse;
+ private MyResponse response;
private String responseText;
private PseudoExecutor executor;
private int ntimes;
public void setUp() throws CoderException {
MockitoAnnotations.initMocks(this);
- topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
- .timeoutSec(TIMEOUT_SEC).build();
-
- when(operator.getActorName()).thenReturn(ACTOR);
- when(operator.getName()).thenReturn(OPERATION);
- when(operator.getTopicHandler()).thenReturn(handler);
- when(operator.getForwarder()).thenReturn(forwarder);
- when(operator.getParams()).thenReturn(topicParams);
- when(operator.isAlive()).thenReturn(true);
+ when(config.getTopicHandler()).thenReturn(handler);
+ when(config.getForwarder()).thenReturn(forwarder);
+ when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
when(handler.send(any())).thenReturn(true);
when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
executor = new PseudoExecutor();
params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
- outcome = params.makeOutcome();
+ outcome = params.makeOutcome(null);
- responseText = coder.encode(new MyResponse());
+ response = new MyResponse();
+ response.setRequestId(REQ_ID);
+ responseText = coder.encode(response);
stdResponse = coder.decode(responseText, StandardCoderObject.class);
ntimes = 1;
assertEquals(OPERATION, oper.getName());
assertSame(handler, oper.getTopicHandler());
assertSame(forwarder, oper.getForwarder());
- assertSame(topicParams, oper.getTopicParams());
assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
assertSame(MyResponse.class, oper.getResponseClass());
}
assertTrue(future.isDone());
assertSame(outcome, future.get());
- assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
+ assertEquals(response, outcome.getResponse());
+
+ verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
+ }
+
+ /**
+ * Tests startOperationAsync() when processResponse() throws an exception.
+ */
+ @Test
+ public void testStartOperationAsyncProcException() throws Exception {
+ oper = new MyOperation() {
+ @Override
+ protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
+ StandardCoderObject scoResponse) {
+ throw EXPECTED_EXCEPTION;
+ }
+ };
+
+ CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
+ assertFalse(future.isDone());
+
+ verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
+
+ verify(forwarder, never()).unregister(any(), any());
+
+ // provide a response
+ listenerCaptor.getValue().accept(responseText, stdResponse);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(future.isCompletedExceptionally());
verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
}
* Tests startOperationAsync() when the publisher throws an exception.
*/
@Test
- public void testStartOperationAsyncException() throws Exception {
+ public void testStartOperationAsyncPubException() throws Exception {
// indicate that nothing was published
when(handler.send(any())).thenReturn(false);
MyStringOperation oper2 = new MyStringOperation();
assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
- assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
+ assertEquals(TEXT, outcome.getResponse());
}
/**
MyScoOperation oper2 = new MyScoOperation();
assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
- assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
+ assertEquals(stdResponse, outcome.getResponse());
}
/**
stdResponse = coder.decode(responseText, StandardCoderObject.class);
assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
- assertEquals(PolicyResult.FAILURE, outcome.getResult());
+ assertEquals(OperationResult.FAILURE, outcome.getResult());
+ assertEquals(resp, outcome.getResponse());
}
/**
@Test
public void testProcessResponseDecodeOk() throws CoderException {
assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
- assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
+ assertEquals(response, outcome.getResponse());
}
/**
}
@Test
- public void testMakeCoder() {
- assertNotNull(oper.makeCoder());
+ public void testGetCoder() {
+ assertNotNull(oper.getCoder());
}
/**
private void setOperCoderException() {
oper = new MyOperation() {
@Override
- protected Coder makeCoder() {
+ protected Coder getCoder() {
return new StandardCoder() {
@Override
public String encode(Object object, boolean pretty) throws CoderException {
@Getter
@Setter
+ @EqualsAndHashCode
public static class MyResponse {
- private String requestId = REQ_ID;
+ private String requestId;
private String output;
}
private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
+
public MyStringOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, String.class);
+ super(BidirectionalTopicOperationTest.this.params, config, String.class, Collections.emptyList());
}
@Override
private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
public MyScoOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
+ super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class,
+ Collections.emptyList());
}
@Override
private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
public MyOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
+ super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class, Collections.emptyList());
}
@Override