Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
@@ -24,40 +24,42 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 import lombok.Getter;
-import org.apache.commons.lang3.tuple.Triple;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
-import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
 import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Operation that uses a Topic pair.
+ * Operation that uses a bidirectional topic.
  *
  * @param <S> response type
  */
 @Getter
-public abstract class TopicPairOperation<Q, S> extends OperationPartial {
-    private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
-    private static final Coder coder = new StandardCoder();
+public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
+    private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
+
+    /**
+     * Response status.
+     */
+    public enum Status {
+        SUCCESS, FAILURE, STILL_WAITING
+    }
 
     // fields extracted from the operator
 
-    private final TopicPair topicPair;
+    private final BidirectionalTopicHandler topicHandler;
     private final Forwarder forwarder;
-    private final TopicPairParams pairParams;
+    private final BidirectionalTopicParams topicParams;
     private final long timeoutMs;
 
     /**
@@ -73,13 +75,14 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
      * @param operator operator that created this operation
      * @param clazz response class
      */
-    public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+    public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
+                    Class<S> clazz) {
         super(params, operator);
-        this.topicPair = operator.getTopicPair();
+        this.topicHandler = operator.getTopicHandler();
         this.forwarder = operator.getForwarder();
-        this.pairParams = operator.getParams();
+        this.topicParams = operator.getParams();
         this.responseClass = clazz;
-        this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+        this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
     }
 
     /**
@@ -101,18 +104,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
 
         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
-                        new CompletableFuture<>();
         final Executor executor = params.getExecutor();
 
         // register a listener BEFORE publishing
 
-        // @formatter:off
-        TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
-            (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
-        // @formatter:on
-
-        // TODO this currently only allows a single matching response
+        BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
+            OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+            if (latestOutcome != null) {
+                // final response - complete the controller
+                controller.completeAsync(() -> latestOutcome, executor);
+            }
+        };
 
         forwarder.register(expectedKeyValues, listener);
 
@@ -128,16 +130,6 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             throw e;
         }
 
-
-        // once "future" completes, process the response, and then complete the controller
-
-        // @formatter:off
-        future.thenApplyAsync(
-            triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
-                            executor)
-                        .whenCompleteAsync(controller.delayedComplete(), executor);
-        // @formatter:on
-
         return controller;
     }
 
@@ -175,12 +167,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             throw new IllegalArgumentException("cannot encode request", e);
         }
 
-        List<CommInfrastructure> list = topicPair.publish(json);
-        if (list.isEmpty()) {
+        if (!topicHandler.send(json)) {
             throw new IllegalStateException("nothing published");
         }
 
-        logTopicRequest(list, request);
+        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
     }
 
     /**
@@ -190,15 +181,17 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
      * @param outcome outcome to be populated
      * @param response raw response to process
      * @param scoResponse response, as a {@link StandardCoderObject}
-     * @return the outcome
+     * @return the outcome, or {@code null} if still waiting for completion
      */
-    protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+    protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
                     StandardCoderObject scoResponse) {
 
         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
-        logTopicResponse(infra, rawResponse);
+        logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
+                        rawResponse);
 
+        // decode the response
         S response;
         if (responseClass == String.class) {
             response = responseClass.cast(rawResponse);
@@ -216,17 +209,26 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
             }
         }
 
-        if (!isSuccess(rawResponse, response)) {
-            logger.info("{}.{} request failed  for {}", params.getActor(), params.getOperation(),
-                            params.getRequestId());
-            return setOutcome(outcome, PolicyResult.FAILURE);
-        }
+        // check its status
+        switch (detmStatus(rawResponse, response)) {
+            case SUCCESS:
+                logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                setOutcome(outcome, PolicyResult.SUCCESS);
+                postProcessResponse(outcome, rawResponse, response);
+                return outcome;
 
-        logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
-        setOutcome(outcome, PolicyResult.SUCCESS);
-        postProcessResponse(outcome, rawResponse, response);
+            case FAILURE:
+                logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                return setOutcome(outcome, PolicyResult.FAILURE);
 
-        return outcome;
+            case STILL_WAITING:
+            default:
+                logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+                                params.getRequestId());
+                return null;
+        }
     }
 
     /**
@@ -241,76 +243,11 @@ public abstract class TopicPairOperation<Q, S> extends OperationPartial {
     }
 
     /**
-     * Determines if the response indicates success.
+     * Determines the status of the response.
      *
      * @param rawResponse raw response
      * @param response decoded response
-     * @return {@code true} if the response indicates success, {@code false} otherwise
-     */
-    protected abstract boolean isSuccess(String rawResponse, S response);
-
-    /**
-     * Logs a TOPIC request. If the request is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param infrastructures list of communication infrastructures on which it was
-     *        published
-     * @param request request to be logged
-     */
-    protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
-        if (infrastructures.isEmpty()) {
-            return;
-        }
-
-        String json;
-        try {
-            if (request == null) {
-                json = null;
-            } else if (request instanceof String) {
-                json = request.toString();
-            } else {
-                json = makeCoder().encode(request, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print request", e);
-            json = request.toString();
-        }
-
-        for (CommInfrastructure infra : infrastructures) {
-            logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
-        }
-    }
-
-    /**
-     * Logs a TOPIC response. If the response is not of type, String, then it attempts to
-     * pretty-print it into JSON before logging.
-     *
-     * @param infra communication infrastructure on which the response was received
-     * @param response response to be logged
+     * @return the status of the response
      */
-    protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
-        String json;
-        try {
-            if (response == null) {
-                json = null;
-            } else if (response instanceof String) {
-                json = response.toString();
-            } else {
-                json = makeCoder().encode(response, true);
-            }
-
-        } catch (CoderException e) {
-            logger.warn("cannot pretty-print response", e);
-            json = response.toString();
-        }
-
-        logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
-    }
-
-    // these may be overridden by junit tests
-
-    protected Coder makeCoder() {
-        return coder;
-    }
+    protected abstract Status detmStatus(String rawResponse, S response);
 }