More sonars in models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
index d1e21f8..b0ad38e 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,19 +23,18 @@ package org.onap.policy.controlloop.actorserviceprovider.impl;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import lombok.Getter;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,34 +54,41 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
         SUCCESS, FAILURE, STILL_WAITING
     }
 
-    // fields extracted from the operator
-
-    private final BidirectionalTopicHandler topicHandler;
-    private final Forwarder forwarder;
-    private final BidirectionalTopicParams topicParams;
-    private final long timeoutMs;
+    /**
+     * Configuration for this operation.
+     */
+    private final BidirectionalTopicConfig config;
 
     /**
      * Response class.
      */
     private final Class<S> responseClass;
 
+    // fields extracted from "config"
+
+    private final BidirectionalTopicHandler topicHandler;
+    private final Forwarder forwarder;
+
 
     /**
      * Constructs the object.
      *
      * @param params operation parameters
-     * @param operator operator that created this operation
+     * @param config configuration for this operation
      * @param clazz response class
+     * @param propertyNames names of properties required by this operation
      */
-    public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
-                    Class<S> clazz) {
-        super(params, operator);
-        this.topicHandler = operator.getTopicHandler();
-        this.forwarder = operator.getForwarder();
-        this.topicParams = operator.getParams();
+    protected BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
+                    Class<S> clazz, List<String> propertyNames) {
+        super(params, config, propertyNames);
+        this.config = config;
         this.responseClass = clazz;
-        this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
+        this.forwarder = config.getForwarder();
+        this.topicHandler = config.getTopicHandler();
+    }
+
+    public long getTimeoutMs() {
+        return config.getTimeoutMs();
     }
 
     /**
@@ -91,7 +97,7 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
     @Override
     protected long getTimeoutMs(Integer timeoutSec) {
         // TODO move this method to the superclass
-        return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
+        return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
     }
 
     /**
@@ -109,10 +115,15 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
         // register a listener BEFORE publishing
 
         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
-            OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
-            if (latestOutcome != null) {
-                // final response - complete the controller
-                controller.completeAsync(() -> latestOutcome, executor);
+            try {
+                OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+                if (latestOutcome != null) {
+                    // final response - complete the controller
+                    controller.completeAsync(() -> latestOutcome, executor);
+                }
+            } catch (RuntimeException e) {
+                logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
+                controller.completeExceptionally(e);
             }
         };
 
@@ -156,22 +167,12 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
      * @param request request to be published
      */
     protected void publishRequest(Q request) {
-        String json;
-        try {
-            if (request instanceof String) {
-                json = request.toString();
-            } else {
-                json = makeCoder().encode(request);
-            }
-        } catch (CoderException e) {
-            throw new IllegalArgumentException("cannot encode request", e);
-        }
+        String json = prettyPrint(request);
+        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
 
         if (!topicHandler.send(json)) {
             throw new IllegalStateException("nothing published");
         }
-
-        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
     }
 
     /**
@@ -201,7 +202,7 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
 
         } else {
             try {
-                response = makeCoder().decode(rawResponse, responseClass);
+                response = getCoder().decode(rawResponse, responseClass);
             } catch (CoderException e) {
                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
@@ -214,14 +215,14 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
             case SUCCESS:
                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                setOutcome(outcome, PolicyResult.SUCCESS, response);
+                setOutcome(outcome, OperationResult.SUCCESS, response);
                 postProcessResponse(outcome, rawResponse, response);
                 return outcome;
 
             case FAILURE:
                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                return setOutcome(outcome, PolicyResult.FAILURE, response);
+                return setOutcome(outcome, OperationResult.FAILURE, response);
 
             case STILL_WAITING:
             default:
@@ -239,7 +240,8 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
      * @param response response used to populate the outcome
      * @return the updated operation
      */
-    public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
+    public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
+        outcome.setResponse(response);
         return setOutcome(outcome, result);
     }