Make Actors event-agnostic
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / GrpcOperation.java
index 820f4de..44d5181 100644 (file)
@@ -25,8 +25,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Struct;
 import com.google.protobuf.Struct.Builder;
 import com.google.protobuf.util.JsonFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.UUID;
@@ -38,20 +40,17 @@ import org.onap.aai.domain.yang.ServiceInstance;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
-import org.onap.policy.aai.AaiConstants;
-import org.onap.policy.aai.AaiCqResponse;
 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
 import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.controlloop.actor.aai.AaiCustomQueryOperation;
-import org.onap.policy.controlloop.actor.aai.AaiGetPnfOperation;
 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.TargetType;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.policy.TargetType;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
 /**
  * Operation that uses gRPC to send request to CDS.
@@ -73,16 +72,26 @@ public class GrpcOperation extends OperationPartial {
      */
     private final GrpcConfig config;
 
-    /**
-     * Function to request the A&AI data appropriate to the target type.
-     */
-    private final Supplier<CompletableFuture<OperationOutcome>> aaiRequestor;
-
     /**
      * Function to convert the A&AI data associated with the target type.
      */
     private final Supplier<Map<String, String>> aaiConverter;
 
+
+    // @formatter:off
+    private static final List<String> PNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_PNF,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+
+
+    private static final List<String> VNF_PROPERTY_NAMES = List.of(
+                            OperationProperties.AAI_RESOURCE_VNF,
+                            OperationProperties.AAI_SERVICE,
+                            OperationProperties.EVENT_ADDITIONAL_PARAMS,
+                            OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+    // @formatter:on
+
     /**
      * Constructs the object.
      *
@@ -90,58 +99,27 @@ public class GrpcOperation extends OperationPartial {
      * @param config configuration for this operation
      */
     public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) {
-        super(params, config);
+        super(params, config, Collections.emptyList());
         this.config = config;
 
-        if (TargetType.PNF.equals(params.getTarget().getType())) {
-            aaiRequestor = this::getPnf;
+        if (TargetType.PNF.equals(params.getTargetType())) {
             aaiConverter = this::convertPnfToAaiProperties;
         } else {
-            aaiRequestor = this::getCq;
-            aaiConverter = this::convertCqToAaiProperties;
+            aaiConverter = this::convertVnfToAaiProperties;
         }
     }
 
-    /**
-     * If no timeout is specified, then it returns the operator's configured timeout.
-     */
     @Override
-    protected long getTimeoutMs(Integer timeoutSec) {
-        return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
+    public List<String> getPropertyNames() {
+        return (TargetType.PNF.equals(params.getTargetType()) ? PNF_PROPERTY_NAMES : VNF_PROPERTY_NAMES);
     }
 
     /**
-     * Ensures that A&AI query has been performed, and runs the guard.
+     * If no timeout is specified, then it returns the operator's configured timeout.
      */
     @Override
-    @SuppressWarnings("unchecked")
-    protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
-        // run A&AI Query and Guard, in parallel
-        return allOf(aaiRequestor, this::startGuardAsync);
-    }
-
-    /**
-     * Requests the A&AI PNF data.
-     *
-     * @return a future to get the PNF data
-     */
-    private CompletableFuture<OperationOutcome> getPnf() {
-        ControlLoopOperationParams pnfParams = params.toBuilder().actor(AaiConstants.ACTOR_NAME)
-                        .operation(AaiGetPnfOperation.NAME).payload(null).retry(null).timeoutSec(null).build();
-
-        return params.getContext().obtain(AaiGetPnfOperation.getKey(params.getTargetEntity()), pnfParams);
-    }
-
-    /**
-     * Requests the A&AI Custom Query data.
-     *
-     * @return a future to get the custom query data
-     */
-    private CompletableFuture<OperationOutcome> getCq() {
-        ControlLoopOperationParams cqParams = params.toBuilder().actor(AaiConstants.ACTOR_NAME)
-                        .operation(AaiCustomQueryOperation.NAME).payload(null).retry(null).timeoutSec(null).build();
-
-        return params.getContext().obtain(AaiCqResponse.CONTEXT_KEY, cqParams);
+    protected long getTimeoutMs(Integer timeoutSec) {
+        return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
     }
 
     /**
@@ -151,11 +129,16 @@ public class GrpcOperation extends OperationPartial {
      * @return a map of the PNF data
      */
     private Map<String, String> convertPnfToAaiProperties() {
+        Map<String, String> result = getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+        if (result != null) {
+            return result;
+        }
+
         // convert PNF data to a Map
-        StandardCoderObject pnf = params.getContext().getProperty(AaiGetPnfOperation.getKey(params.getTargetEntity()));
-        Map<String, Object> source = Util.translateToMap(getFullName(), pnf);
+        Object pnfData = getRequiredProperty(OperationProperties.AAI_PNF, "PNF");
+        Map<String, Object> source = Util.translateToMap(getFullName(), pnfData);
 
-        Map<String, String> result = new LinkedHashMap<>();
+        result = new LinkedHashMap<>();
 
         for (Entry<String, Object> ent : source.entrySet()) {
             result.put(AAI_PNF_PREFIX + ent.getKey(), ent.getValue().toString());
@@ -165,32 +148,36 @@ public class GrpcOperation extends OperationPartial {
     }
 
     /**
-     * Converts the A&AI Custom Query data to a map suitable for passing via the
+     * Converts the A&AI VNF data to a map suitable for passing via the
      * "aaiProperties" field in the CDS request.
      *
-     * @return a map of the custom query data
+     * @return a map of the VNF data
      */
-    private Map<String, String> convertCqToAaiProperties() {
-        AaiCqResponse aaicq = params.getContext().getProperty(AaiCqResponse.CONTEXT_KEY);
-
-        Map<String, String> result = new LinkedHashMap<>();
-
-        ServiceInstance serviceInstance = aaicq.getServiceInstance();
-        if (serviceInstance == null) {
-            throw new IllegalArgumentException("Target service instance could not be found");
+    private Map<String, String> convertVnfToAaiProperties() {
+        Map<String, String> result = getProperty(OperationProperties.OPT_CDS_GRPC_AAI_PROPERTIES);
+        if (result != null) {
+            return result;
         }
 
-        GenericVnf genericVnf = aaicq.getGenericVnfByModelInvariantId(params.getTarget().getResourceID());
-        if (genericVnf == null) {
-            throw new IllegalArgumentException("Target generic vnf could not be found");
-        }
+        result = new LinkedHashMap<>();
 
-        result.put(AAI_SERVICE_INSTANCE_ID_KEY, serviceInstance.getServiceInstanceId());
-        result.put(AAI_VNF_ID_KEY, genericVnf.getVnfId());
+        result.put(AAI_SERVICE_INSTANCE_ID_KEY, getServiceInstanceId());
+        result.put(AAI_VNF_ID_KEY, getVnfId());
 
         return result;
     }
 
+    protected String getServiceInstanceId() {
+        ServiceInstance serviceInstance =
+                        getRequiredProperty(OperationProperties.AAI_SERVICE, "Target service instance");
+        return serviceInstance.getServiceInstanceId();
+    }
+
+    protected String getVnfId() {
+        GenericVnf genericVnf = getRequiredProperty(OperationProperties.AAI_RESOURCE_VNF, "Target generic vnf");
+        return genericVnf.getVnfId();
+    }
+
     @Override
     public void generateSubRequestId(int attempt) {
         setSubRequestId("0");
@@ -202,13 +189,26 @@ public class GrpcOperation extends OperationPartial {
     @Override
     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
+        /*
+         * construct the request first so that we don't have to clean up the "client" if
+         * an exception is thrown
+         */
+        ExecutionServiceInput request = constructRequest();
+
         CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+
         client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
                         config.getCdsServerProperties());
 
-        ExecutionServiceInput request = constructRequest(params);
         client.sendRequest(request);
-        return future;
+
+        // arrange to shutdown the client when the request completes
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        controller.wrap(future).whenCompleteAsync(controller.delayedComplete(), params.getExecutor())
+                        .whenCompleteAsync((arg1, arg2) -> client.close(), getBlockingExecutor());
+
+        return controller;
     }
 
     /**
@@ -216,10 +216,9 @@ public class GrpcOperation extends OperationPartial {
      * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
      * item for Frankfurt release.
      *
-     * @param params the control loop parameters specifying the onset, payload, etc.
      * @return an ExecutionServiceInput instance.
      */
-    public ExecutionServiceInput constructRequest(ControlLoopOperationParams params) {
+    public ExecutionServiceInput constructRequest() {
 
         // For the current operational TOSCA policy model (yaml) CBA name and version are
         // embedded in the payload
@@ -256,8 +255,9 @@ public class GrpcOperation extends OperationPartial {
         request.setAaiProperties(aaiConverter.get());
 
         // Inject any additional event parameters that may be present in the onset event
-        if (params.getContext().getEvent().getAdditionalEventParams() != null) {
-            request.setAdditionalEventParams(params.getContext().getEvent().getAdditionalEventParams());
+        Map<String, String> additionalParams = getProperty(OperationProperties.EVENT_ADDITIONAL_PARAMS);
+        if (additionalParams != null) {
+            request.setAdditionalEventParams(additionalParams);
         }
 
         Builder struct = Struct.newBuilder();
@@ -273,8 +273,7 @@ public class GrpcOperation extends OperationPartial {
 
         // Build CDS gRPC request common-header
         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
-                        .setRequestId(params.getContext().getEvent().getRequestId().toString())
-                        .setSubRequestId(getSubRequestId()).build();
+                        .setRequestId(params.getRequestId().toString()).setSubRequestId(getSubRequestId()).build();
 
         // Build CDS gRPC request action-identifier
         ActionIdentifiers actionIdentifiers =