import java.util.function.BiConsumer;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.tuple.Pair;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.common.utils.time.PseudoExecutor;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
private static final String REQ_ID = "my-request-id";
- private static final String MY_SINK = "my-sink";
- private static final String MY_SOURCE = "my-source";
private static final String TEXT = "some text";
+ private static final String SUB_REQID = "my-sub-request-id";
private static final int TIMEOUT_SEC = 10;
private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
private static final int MAX_REQUESTS = 100;
private static final StandardCoder coder = new StandardCoder();
@Mock
- private BidirectionalTopicOperator operator;
+ private BidirectionalTopicConfig config;
@Mock
private BidirectionalTopicHandler handler;
@Mock
private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
private ControlLoopOperationParams params;
- private BidirectionalTopicParams topicParams;
private OperationOutcome outcome;
private StandardCoderObject stdResponse;
private String responseText;
public void setUp() throws CoderException {
MockitoAnnotations.initMocks(this);
- topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
- .timeoutSec(TIMEOUT_SEC).build();
-
- when(operator.getActorName()).thenReturn(ACTOR);
- when(operator.getName()).thenReturn(OPERATION);
- when(operator.getTopicHandler()).thenReturn(handler);
- when(operator.getForwarder()).thenReturn(forwarder);
- when(operator.getParams()).thenReturn(topicParams);
- when(operator.isAlive()).thenReturn(true);
+ when(config.getTopicHandler()).thenReturn(handler);
+ when(config.getForwarder()).thenReturn(forwarder);
+ when(config.getTimeoutMs()).thenReturn(TIMEOUT_MS);
when(handler.send(any())).thenReturn(true);
when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
assertEquals(OPERATION, oper.getName());
assertSame(handler, oper.getTopicHandler());
assertSame(forwarder, oper.getForwarder());
- assertSame(topicParams, oper.getTopicParams());
assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
assertSame(MyResponse.class, oper.getResponseClass());
}
CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
assertFalse(future.isDone());
+ assertEquals(SUB_REQID, outcome.getSubRequestId());
+
verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
verify(forwarder, never()).unregister(any(), any());
verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
}
+ /**
+ * Tests startOperationAsync() when processResponse() throws an exception.
+ */
+ @Test
+ public void testStartOperationAsyncProcException() throws Exception {
+ oper = new MyOperation() {
+ @Override
+ protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
+ StandardCoderObject scoResponse) {
+ throw EXPECTED_EXCEPTION;
+ }
+ };
+
+ CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
+ assertFalse(future.isDone());
+
+ assertEquals(SUB_REQID, outcome.getSubRequestId());
+
+ verify(forwarder).register(eq(Arrays.asList(REQ_ID)), listenerCaptor.capture());
+
+ verify(forwarder, never()).unregister(any(), any());
+
+ // provide a response
+ listenerCaptor.getValue().accept(responseText, stdResponse);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(future.isCompletedExceptionally());
+
+ verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
+ }
+
/**
* Tests startOperationAsync() when the publisher throws an exception.
*/
@Test
- public void testStartOperationAsyncException() throws Exception {
+ public void testStartOperationAsyncPubException() throws Exception {
// indicate that nothing was published
when(handler.send(any())).thenReturn(false);
private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
+
public MyStringOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, String.class);
+ super(BidirectionalTopicOperationTest.this.params, config, String.class);
}
@Override
- protected String makeRequest(int attempt) {
- return TEXT;
+ protected Pair<String, String> makeRequest(int attempt) {
+ return Pair.of(SUB_REQID, TEXT);
}
@Override
private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
public MyScoOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
+ super(BidirectionalTopicOperationTest.this.params, config, StandardCoderObject.class);
}
@Override
- protected MyRequest makeRequest(int attempt) {
- return new MyRequest();
+ protected Pair<String, MyRequest> makeRequest(int attempt) {
+ return Pair.of(SUB_REQID, new MyRequest());
}
@Override
private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
public MyOperation() {
- super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
+ super(BidirectionalTopicOperationTest.this.params, config, MyResponse.class);
}
@Override
- protected MyRequest makeRequest(int attempt) {
- return new MyRequest();
+ protected Pair<String, MyRequest> makeRequest(int attempt) {
+ return Pair.of(SUB_REQID, new MyRequest());
}
@Override