Remove Target and TargetType
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
index 1ae8049..e02e592 100644 (file)
@@ -29,12 +29,12 @@ import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,10 +76,11 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
      * @param params operation parameters
      * @param config configuration for this operation
      * @param clazz response class
+     * @param propertyNames names of properties required by this operation
      */
     public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
-                    Class<S> clazz) {
-        super(params, config);
+                    Class<S> clazz, List<String> propertyNames) {
+        super(params, config, propertyNames);
         this.config = config;
         this.responseClass = clazz;
         this.forwarder = config.getForwarder();
@@ -114,10 +115,15 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
         // register a listener BEFORE publishing
 
         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
-            OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
-            if (latestOutcome != null) {
-                // final response - complete the controller
-                controller.completeAsync(() -> latestOutcome, executor);
+            try {
+                OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+                if (latestOutcome != null) {
+                    // final response - complete the controller
+                    controller.completeAsync(() -> latestOutcome, executor);
+                }
+            } catch (RuntimeException e) {
+                logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
+                controller.completeExceptionally(e);
             }
         };
 
@@ -161,22 +167,12 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
      * @param request request to be published
      */
     protected void publishRequest(Q request) {
-        String json;
-        try {
-            if (request instanceof String) {
-                json = request.toString();
-            } else {
-                json = makeCoder().encode(request);
-            }
-        } catch (CoderException e) {
-            throw new IllegalArgumentException("cannot encode request", e);
-        }
+        String json = prettyPrint(request);
+        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
 
         if (!topicHandler.send(json)) {
             throw new IllegalStateException("nothing published");
         }
-
-        logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
     }
 
     /**
@@ -206,7 +202,7 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
 
         } else {
             try {
-                response = makeCoder().decode(rawResponse, responseClass);
+                response = getCoder().decode(rawResponse, responseClass);
             } catch (CoderException e) {
                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
@@ -219,14 +215,14 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
             case SUCCESS:
                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                setOutcome(outcome, PolicyResult.SUCCESS, response);
+                setOutcome(outcome, OperationResult.SUCCESS, response);
                 postProcessResponse(outcome, rawResponse, response);
                 return outcome;
 
             case FAILURE:
                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
                                 params.getRequestId());
-                return setOutcome(outcome, PolicyResult.FAILURE, response);
+                return setOutcome(outcome, OperationResult.FAILURE, response);
 
             case STILL_WAITING:
             default:
@@ -244,7 +240,8 @@ public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial
      * @param response response used to populate the outcome
      * @return the updated operation
      */
-    public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
+    public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
+        outcome.setResponse(response);
         return setOutcome(outcome, result);
     }