Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperationTest.java
@@ -20,7 +20,6 @@
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -35,96 +34,64 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import ch.qos.logback.classic.Logger;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Getter;
 import lombok.Setter;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
 import org.onap.policy.controlloop.policy.PolicyResult;
-import org.slf4j.LoggerFactory;
 
-public class TopicPairOperationTest {
-    private static final List<CommInfrastructure> INFRA_LIST =
-                    Arrays.asList(CommInfrastructure.NOOP, CommInfrastructure.UEB);
+public class BidirectionalTopicOperationTest {
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
     private static final String REQ_ID = "my-request-id";
+    private static final String MY_SINK = "my-sink";
     private static final String MY_SOURCE = "my-source";
-    private static final String MY_TARGET = "my-target";
     private static final String TEXT = "some text";
     private static final int TIMEOUT_SEC = 10;
     private static final long TIMEOUT_MS = 1000 * TIMEOUT_SEC;
+    private static final int MAX_REQUESTS = 100;
 
     private static final StandardCoder coder = new StandardCoder();
 
-    /**
-     * Used to attach an appender to the class' logger.
-     */
-    private static final Logger logger = (Logger) LoggerFactory.getLogger(TopicPairOperation.class);
-    private static final ExtractAppender appender = new ExtractAppender();
-
     @Mock
-    private TopicPairOperator operator;
+    private BidirectionalTopicOperator operator;
     @Mock
-    private TopicPair pair;
+    private BidirectionalTopicHandler handler;
     @Mock
     private Forwarder forwarder;
 
     @Captor
-    private ArgumentCaptor<TriConsumer<CommInfrastructure, String, StandardCoderObject>> listenerCaptor;
+    private ArgumentCaptor<BiConsumer<String, StandardCoderObject>> listenerCaptor;
 
     private ControlLoopOperationParams params;
-    private TopicPairParams topicParams;
+    private BidirectionalTopicParams topicParams;
     private OperationOutcome outcome;
     private StandardCoderObject stdResponse;
     private String responseText;
-    private MyExec executor;
-    private TopicPairOperation<MyRequest, MyResponse> oper;
-
-    /**
-     * Attaches the appender to the logger.
-     */
-    @BeforeClass
-    public static void setUpBeforeClass() throws Exception {
-        /**
-         * Attach appender to the logger.
-         */
-        appender.setContext(logger.getLoggerContext());
-        appender.start();
-
-        logger.addAppender(appender);
-    }
-
-    /**
-     * Stops the appender.
-     */
-    @AfterClass
-    public static void tearDownAfterClass() {
-        appender.stop();
-    }
+    private PseudoExecutor executor;
+    private int ntimes;
+    private BidirectionalTopicOperation<MyRequest, MyResponse> oper;
 
     /**
      * Sets up.
@@ -133,20 +100,20 @@ public class TopicPairOperationTest {
     public void setUp() throws CoderException {
         MockitoAnnotations.initMocks(this);
 
-        appender.clearExtractions();
-
-        topicParams = TopicPairParams.builder().source(MY_SOURCE).target(MY_TARGET).timeoutSec(TIMEOUT_SEC).build();
+        topicParams = BidirectionalTopicParams.builder().sourceTopic(MY_SOURCE).sinkTopic(MY_SINK)
+                        .timeoutSec(TIMEOUT_SEC).build();
 
         when(operator.getActorName()).thenReturn(ACTOR);
         when(operator.getName()).thenReturn(OPERATION);
-        when(operator.getTopicPair()).thenReturn(pair);
+        when(operator.getTopicHandler()).thenReturn(handler);
         when(operator.getForwarder()).thenReturn(forwarder);
         when(operator.getParams()).thenReturn(topicParams);
         when(operator.isAlive()).thenReturn(true);
 
-        when(pair.publish(any())).thenReturn(INFRA_LIST);
+        when(handler.send(any())).thenReturn(true);
+        when(handler.getSinkTopicCommInfrastructure()).thenReturn(SINK_INFRA);
 
-        executor = new MyExec(100);
+        executor = new PseudoExecutor();
 
         params = ControlLoopOperationParams.builder().actor(ACTOR).operation(OPERATION).executor(executor).build();
         outcome = params.makeOutcome();
@@ -154,22 +121,28 @@ public class TopicPairOperationTest {
         responseText = coder.encode(new MyResponse());
         stdResponse = coder.decode(responseText, StandardCoderObject.class);
 
+        ntimes = 1;
+
         oper = new MyOperation();
     }
 
     @Test
-    public void testTopicPairOperation_testGetTopicPair_testGetForwarder_testGetPairParams() {
+    public void testConstructor_testGetTopicHandler_testGetForwarder_testGetTopicParams() {
         assertEquals(ACTOR, oper.getActorName());
         assertEquals(OPERATION, oper.getName());
-        assertSame(pair, oper.getTopicPair());
+        assertSame(handler, oper.getTopicHandler());
         assertSame(forwarder, oper.getForwarder());
-        assertSame(topicParams, oper.getPairParams());
+        assertSame(topicParams, oper.getTopicParams());
         assertEquals(TIMEOUT_MS, oper.getTimeoutMs());
         assertSame(MyResponse.class, oper.getResponseClass());
     }
 
     @Test
     public void testStartOperationAsync() throws Exception {
+
+        // tell it to expect three responses
+        ntimes = 3;
+
         CompletableFuture<OperationOutcome> future = oper.startOperationAsync(1, outcome);
         assertFalse(future.isDone());
 
@@ -177,17 +150,24 @@ public class TopicPairOperationTest {
 
         verify(forwarder, never()).unregister(any(), any());
 
-        verify(pair).publish(any());
+        verify(handler).send(any());
 
-        // provide the response
-        listenerCaptor.getValue().accept(CommInfrastructure.NOOP, responseText, stdResponse);
+        // provide first response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(future.isDone());
 
-        // run the tasks
-        assertTrue(executor.runAll());
+        // provide second response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(future.isDone());
 
+        // provide final response
+        listenerCaptor.getValue().accept(responseText, stdResponse);
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(future.isDone());
 
-        assertSame(outcome, future.get(5, TimeUnit.SECONDS));
+        assertSame(outcome, future.get());
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
         verify(forwarder).unregister(eq(Arrays.asList(REQ_ID)), eq(listenerCaptor.getValue()));
@@ -199,7 +179,7 @@ public class TopicPairOperationTest {
     @Test
     public void testStartOperationAsyncException() throws Exception {
         // indicate that nothing was published
-        when(pair.publish(any())).thenReturn(Arrays.asList());
+        when(handler.send(any())).thenReturn(false);
 
         assertThatIllegalStateException().isThrownBy(() -> oper.startOperationAsync(1, outcome));
 
@@ -221,8 +201,7 @@ public class TopicPairOperationTest {
 
     @Test
     public void testPublishRequest() {
-        oper.publishRequest(new MyRequest());
-        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+        assertThatCode(() -> oper.publishRequest(new MyRequest())).doesNotThrowAnyException();
     }
 
     /**
@@ -230,7 +209,7 @@ public class TopicPairOperationTest {
      */
     @Test
     public void testPublishRequestUnpublished() {
-        when(pair.publish(any())).thenReturn(Arrays.asList());
+        when(handler.send(any())).thenReturn(false);
         assertThatIllegalStateException().isThrownBy(() -> oper.publishRequest(new MyRequest()));
     }
 
@@ -240,8 +219,7 @@ public class TopicPairOperationTest {
     @Test
     public void testPublishRequestString() {
         MyStringOperation oper2 = new MyStringOperation();
-        oper2.publishRequest(TEXT);
-        assertEquals(INFRA_LIST.size(), appender.getExtracted().size());
+        assertThatCode(() -> oper2.publishRequest(TEXT)).doesNotThrowAnyException();
     }
 
     /**
@@ -260,7 +238,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseSuccessString() {
         MyStringOperation oper2 = new MyStringOperation();
 
-        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, TEXT, null));
+        assertSame(outcome, oper2.processResponse(outcome, TEXT, null));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -272,7 +250,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseSuccessSco() {
         MyScoOperation oper2 = new MyScoOperation();
 
-        assertSame(outcome, oper2.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper2.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -288,7 +266,7 @@ public class TopicPairOperationTest {
         responseText = coder.encode(resp);
         stdResponse = coder.decode(responseText, StandardCoderObject.class);
 
-        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.FAILURE, outcome.getResult());
     }
 
@@ -297,7 +275,7 @@ public class TopicPairOperationTest {
      */
     @Test
     public void testProcessResponseDecodeOk() throws CoderException {
-        assertSame(outcome, oper.processResponse(CommInfrastructure.NOOP, outcome, responseText, stdResponse));
+        assertSame(outcome, oper.processResponse(outcome, responseText, stdResponse));
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
     }
 
@@ -308,7 +286,7 @@ public class TopicPairOperationTest {
     public void testProcessResponseDecodeExcept() throws CoderException {
         // @formatter:off
         assertThatIllegalArgumentException().isThrownBy(
-            () -> oper.processResponse(CommInfrastructure.NOOP, outcome, "{invalid json", stdResponse));
+            () -> oper.processResponse(outcome, "{invalid json", stdResponse));
         // @formatter:on
     }
 
@@ -317,88 +295,6 @@ public class TopicPairOperationTest {
         assertThatCode(() -> oper.postProcessResponse(outcome, null, null)).doesNotThrowAnyException();
     }
 
-    @Test
-    public void testLogTopicRequest() {
-        // nothing to log
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(), new MyRequest());
-        assertEquals(0, appender.getExtracted().size());
-
-        // log structured data
-        appender.clearExtractions();
-        oper.logTopicRequest(INFRA_LIST, new MyRequest());
-        List<String> output = appender.getExtracted();
-        assertEquals(2, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
-                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
-
-        assertThat(output.get(1)).contains(CommInfrastructure.UEB.toString())
-                        .contains("{\n  \"theRequestId\": \"my-request-id\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        new MyStringOperation().logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), TEXT);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
-        // log a null request
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
-        // exception from coder
-        setOperCoderException();
-
-        appender.clearExtractions();
-        oper.logTopicRequest(Arrays.asList(CommInfrastructure.NOOP), new MyRequest());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print request");
-        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
-    }
-
-    @Test
-    public void testLogTopicResponse() {
-        // log structured data
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
-        List<String> output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString())
-                        .contains("{\n  \"requestId\": \"my-request-id\"\n}");
-
-        // log a plain string
-        appender.clearExtractions();
-        new MyStringOperation().logTopicResponse(CommInfrastructure.NOOP, TEXT);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains(TEXT);
-
-        // log a null response
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, null);
-        output = appender.getExtracted();
-        assertEquals(1, output.size());
-
-        assertThat(output.get(0)).contains(CommInfrastructure.NOOP.toString()).contains("null");
-
-        // exception from coder
-        setOperCoderException();
-
-        appender.clearExtractions();
-        oper.logTopicResponse(CommInfrastructure.NOOP, new MyResponse());
-        output = appender.getExtracted();
-        assertEquals(2, output.size());
-        assertThat(output.get(0)).contains("cannot pretty-print response");
-        assertThat(output.get(1)).contains(CommInfrastructure.NOOP.toString());
-    }
-
     @Test
     public void testMakeCoder() {
         assertNotNull(oper.makeCoder());
@@ -436,9 +332,9 @@ public class TopicPairOperationTest {
     }
 
 
-    private class MyStringOperation extends TopicPairOperation<String, String> {
+    private class MyStringOperation extends BidirectionalTopicOperation<String, String> {
         public MyStringOperation() {
-            super(TopicPairOperationTest.this.params, operator, String.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, String.class);
         }
 
         @Override
@@ -452,15 +348,15 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, String response) {
-            return (response != null);
+        protected Status detmStatus(String rawResponse, String response) {
+            return (response != null ? Status.SUCCESS : Status.FAILURE);
         }
     }
 
 
-    private class MyScoOperation extends TopicPairOperation<MyRequest, StandardCoderObject> {
+    private class MyScoOperation extends BidirectionalTopicOperation<MyRequest, StandardCoderObject> {
         public MyScoOperation() {
-            super(TopicPairOperationTest.this.params, operator, StandardCoderObject.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, StandardCoderObject.class);
         }
 
         @Override
@@ -474,15 +370,15 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, StandardCoderObject response) {
-            return (response.getString("output") == null);
+        protected Status detmStatus(String rawResponse, StandardCoderObject response) {
+            return (response.getString("output") == null ? Status.SUCCESS : Status.FAILURE);
         }
     }
 
 
-    private class MyOperation extends TopicPairOperation<MyRequest, MyResponse> {
+    private class MyOperation extends BidirectionalTopicOperation<MyRequest, MyResponse> {
         public MyOperation() {
-            super(TopicPairOperationTest.this.params, operator, MyResponse.class);
+            super(BidirectionalTopicOperationTest.this.params, operator, MyResponse.class);
         }
 
         @Override
@@ -496,8 +392,12 @@ public class TopicPairOperationTest {
         }
 
         @Override
-        protected boolean isSuccess(String rawResponse, MyResponse response) {
-            return (response.getOutput() == null);
+        protected Status detmStatus(String rawResponse, MyResponse response) {
+            if (--ntimes <= 0) {
+                return (response.getOutput() == null ? Status.SUCCESS : Status.FAILURE);
+            }
+
+            return Status.STILL_WAITING;
         }
     }
 }