Skip preprocessor step in Actors
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / GrpcOperation.java
index 820f4de..ec8f2ac 100644 (file)
@@ -25,8 +25,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Struct;
 import com.google.protobuf.Struct.Builder;
 import com.google.protobuf.util.JsonFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
@@ -48,9 +50,11 @@ import org.onap.policy.controlloop.actor.aai.AaiGetPnfOperation;
 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.policy.TargetType;
 
 /**
@@ -83,6 +87,19 @@ public class GrpcOperation extends OperationPartial {
      */
     private final Supplier<Map<String, String>> aaiConverter;
 
+
+    // @formatter:off
+    private static final List<String> PNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_PNF,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS);
+
+
+    private static final List<String> VNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_MODEL_INVARIANT_GENERIC_VNF,
+                            OperationProperties.AAI_RESOURCE_SERVICE_INSTANCE,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS);
+    // @formatter:on
+
     /**
      * Constructs the object.
      *
@@ -90,7 +107,7 @@ public class GrpcOperation extends OperationPartial {
      * @param config configuration for this operation
      */
     public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) {
-        super(params, config);
+        super(params, config, Collections.emptyList());
         this.config = config;
 
         if (TargetType.PNF.equals(params.getTarget().getType())) {
@@ -102,6 +119,11 @@ public class GrpcOperation extends OperationPartial {
         }
     }
 
+    @Override
+    public List<String> getPropertyNames() {
+        return (TargetType.PNF.equals(params.getTarget().getType()) ? PNF_PROPERTY_NAMES : VNF_PROPERTY_NAMES);
+    }
+
     /**
      * If no timeout is specified, then it returns the operator's configured timeout.
      */
@@ -116,6 +138,10 @@ public class GrpcOperation extends OperationPartial {
     @Override
     @SuppressWarnings("unchecked")
     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+        if (params.isPreprocessed()) {
+            return null;
+        }
+
         // run A&AI Query and Guard, in parallel
         return allOf(aaiRequestor, this::startGuardAsync);
     }
@@ -202,13 +228,26 @@ public class GrpcOperation extends OperationPartial {
     @Override
     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
+        /*
+         * construct the request first so that we don't have to clean up the "client" if
+         * an exception is thrown
+         */
+        ExecutionServiceInput request = constructRequest(params);
+
         CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+
         client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
                         config.getCdsServerProperties());
 
-        ExecutionServiceInput request = constructRequest(params);
         client.sendRequest(request);
-        return future;
+
+        // arrange to shutdown the client when the request completes
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        controller.wrap(future).whenCompleteAsync(controller.delayedComplete(), params.getExecutor())
+                        .whenCompleteAsync((arg1, arg2) -> client.close(), getBlockingExecutor());
+
+        return controller;
     }
 
     /**
@@ -273,8 +312,7 @@ public class GrpcOperation extends OperationPartial {
 
         // Build CDS gRPC request common-header
         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
-                        .setRequestId(params.getContext().getEvent().getRequestId().toString())
-                        .setSubRequestId(getSubRequestId()).build();
+                        .setRequestId(params.getRequestId().toString()).setSubRequestId(getSubRequestId()).build();
 
         // Build CDS gRPC request action-identifier
         ActionIdentifiers actionIdentifiers =