Add actor for CDS 67/102167/5
authorRam Krishna Verma <ram_krishna.verma@bell.ca>
Fri, 21 Feb 2020 22:24:47 +0000 (17:24 -0500)
committerRam Krishna Verma <ram_krishna.verma@bell.ca>
Tue, 25 Feb 2020 21:03:07 +0000 (16:03 -0500)
1) Create the operator, operation & manager classes for gRPC request.
2) Use CompletableFuture to track CDS request flow.

Issue-ID: POLICY-2384
Change-Id: I84e30131a69c2d24c1871ceebced2b69194f619c
Signed-off-by: Ram Krishna Verma <ram_krishna.verma@bell.ca>
models-interactions/model-actors/actor.cds/pom.xml
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java [new file with mode: 0644]
models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java [new file with mode: 0644]

index 4a5979e..d17e0b2 100644 (file)
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+            <artifactId>actor.aai</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
             <artifactId>events</artifactId>
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java
new file mode 100644 (file)
index 0000000..2afa9fa
--- /dev/null
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.concurrent.CompletableFuture;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CDS Actor service-manager implementation.
+ */
+public class CdsActorServiceManager implements CdsProcessorListener {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceManager.class);
+
+    private final CompletableFuture<OperationOutcome> future;
+
+    private final OperationOutcome outcome;
+
+    /**
+     * Constructs the object.
+     *
+     * @param outcome the operation outcome to populate
+     * @param future the future to complete
+     */
+    public CdsActorServiceManager(OperationOutcome outcome, CompletableFuture<OperationOutcome> future) {
+        this.outcome = outcome;
+        this.future = future;
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void onMessage(final ExecutionServiceOutput message) {
+        LOGGER.info("Received notification from CDS: {}", message);
+        EventType eventType = message.getStatus().getEventType();
+        switch (eventType) {
+            case EVENT_COMPONENT_PROCESSING:
+                LOGGER.info("CDS is processing the message: {}", message);
+                break;
+            case EVENT_COMPONENT_EXECUTED:
+                outcome.setResult(PolicyResult.SUCCESS);
+                future.complete(outcome);
+                break;
+            default:
+                outcome.setResult(PolicyResult.FAILURE);
+                future.complete(outcome);
+                break;
+        }
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public void onError(final Throwable throwable) {
+        future.completeExceptionally(throwable);
+    }
+}
index 05ff02e..91ee55d 100644 (file)
@@ -54,16 +54,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
+ * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto
+ * release.
  */
 public class CdsActorServiceProvider extends ActorImpl {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
 
+    /**
+     * Constructs the object.
+     */
     public CdsActorServiceProvider() {
         super(CdsActorConstants.CDS_ACTOR);
+
+        addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
     }
 
+    // TODO old code: remove lines down to **HERE**
+
     /**
      * {@inheritDoc}.
      */
@@ -97,20 +105,26 @@ public class CdsActorServiceProvider extends ActorImpl {
     }
 
     /**
-     * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
-     * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
+     * Build the CDS ExecutionServiceInput request from the policy object and the AAI
+     * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
+     * item for Frankfurt release.
      *
      * @param onset the event that is reporting the alert for policy to perform an action.
-     * @param operation the control loop operation specifying the actor, operation, target, etc.
-     * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
+     * @param operation the control loop operation specifying the actor, operation,
+     *        target, etc.
+     * @param policy the policy specified from the yaml generated by CLAMP or through
+     *        Policy API.
      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
-     * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
+     * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
+     *         object is returned.
      */
     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
-        ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
+                    ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
 
-        // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
-        // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
+        // For the current operational TOSCA policy model (yaml) CBA name and version are
+        // embedded in the payload
+        // section, with the new policy type model being proposed in Frankfurt we will be
+        // able to move it out.
         Map<String, String> payload = policy.getPayload();
         if (!validateCdsMandatoryParams(policy)) {
             return Optional.empty();
@@ -118,12 +132,14 @@ public class CdsActorServiceProvider extends ActorImpl {
         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
 
-        // Retain only the payload by removing CBA name and version once they are extracted
+        // Retain only the payload by removing CBA name and version once they are
+        // extracted
         // to be put in CDS request header.
         payload.remove(CdsActorConstants.KEY_CBA_NAME);
         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
 
-        // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
+        // Embed payload from policy to ConfigDeployRequest object, serialize and inject
+        // into grpc request.
         String cbaActionName = policy.getRecipe();
         CdsActionRequest request = new CdsActionRequest();
         request.setPolicyPayload(payload);
@@ -132,7 +148,8 @@ public class CdsActorServiceProvider extends ActorImpl {
 
         // Inject AAI properties into payload map. Offer flexibility to the usecase
         // implementation to inject whatever AAI parameters are of interest to them.
-        // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
+        // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
+        // needed by CDS.
         request.setAaiProperties(aaiParams);
 
         // Inject any additional event parameters that may be present in the onset event
@@ -143,36 +160,28 @@ public class CdsActorServiceProvider extends ActorImpl {
         Builder struct = Struct.newBuilder();
         try {
             String requestStr = request.generateCdsPayload();
-            Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
-                + "config-deploy-request from payload parameters: {}", payload);
+            Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
+                            "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
             JsonFormat.parser().merge(requestStr, struct);
         } catch (InvalidProtocolBufferException | CoderException e) {
             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
-                    cbaName, cbaVersion, cbaActionName, e);
+                            cbaName, cbaVersion, cbaActionName, e);
             return Optional.empty();
         }
 
         // Build CDS gRPC request common-header
-        CommonHeader commonHeader = CommonHeader.newBuilder()
-            .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
-            .setRequestId(onset.getRequestId().toString())
-            .setSubRequestId(operation.getSubRequestId())
-            .build();
+        CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
+                        .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
+                        .build();
 
         // Build CDS gRPC request action-identifier
-        ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
-            .setBlueprintName(cbaName)
-            .setBlueprintVersion(cbaVersion)
-            .setActionName(cbaActionName)
-            .setMode(CdsActorConstants.CDS_MODE)
-            .build();
+        ActionIdentifiers actionIdentifiers =
+                        ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
+                                        .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
 
         // Finally build the ExecutionServiceInput gRPC request object.
-        ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
-            .setCommonHeader(commonHeader)
-            .setActionIdentifiers(actionIdentifiers)
-            .setPayload(struct.build())
-            .build();
+        ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
+                        .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
         return Optional.of(executionServiceInput);
     }
 
@@ -184,8 +193,8 @@ public class CdsActorServiceProvider extends ActorImpl {
         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
         String cbaActionName = policy.getRecipe();
-        return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
-            .isNullOrEmpty(cbaActionName);
+        return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
+                        && !Strings.isNullOrEmpty(cbaActionName);
     }
 
     public class CdsActorServiceManager implements CdsProcessorListener {
@@ -234,10 +243,11 @@ public class CdsActorServiceProvider extends ActorImpl {
          * @return the cds response.
          */
         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
-                                            ExecutionServiceInput executionServiceInput) {
+                        ExecutionServiceInput executionServiceInput) {
             try {
                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
-                // TO-DO: Handle requests asynchronously once the callback support is added to actors.
+                // TO-DO: Handle requests asynchronously once the callback support is
+                // added to actors.
                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
                 if (!status) {
@@ -252,9 +262,9 @@ public class CdsActorServiceProvider extends ActorImpl {
             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
 
             CdsResponse response = new CdsResponse();
-            response.setRequestId(
-                    executionServiceInput != null && executionServiceInput.getCommonHeader() != null
-                        ? executionServiceInput.getCommonHeader().getRequestId() : null);
+            response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
+                            ? executionServiceInput.getCommonHeader().getRequestId()
+                            : null);
             response.setStatus(this.getCdsStatus());
             return response;
         }
@@ -263,4 +273,6 @@ public class CdsActorServiceProvider extends ActorImpl {
             return cdsStatus.get();
         }
     }
+
+    // **HERE**
 }
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java
new file mode 100644 (file)
index 0000000..3d79149
--- /dev/null
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
+
+/**
+ * Configuration for gRPC Operators.
+ */
+@Getter
+public class GrpcConfig extends OperatorConfig {
+
+    /**
+     * Default timeout, in milliseconds, if none specified in the request.
+     */
+    private final long timeoutMs;
+
+    private CdsServerProperties cdsServerProperties;
+
+    /**
+     * Constructs the object.
+     *
+     * @param blockingExecutor executor to be used for tasks that may perform blocking I/O
+     * @param params operator parameters
+     */
+    public GrpcConfig(Executor blockingExecutor, CdsServerProperties params) {
+        super(blockingExecutor);
+        cdsServerProperties = params;
+        timeoutMs = TimeUnit.MILLISECONDS.convert(params.getTimeout(), TimeUnit.SECONDS);
+    }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
new file mode 100644 (file)
index 0000000..efe358b
--- /dev/null
@@ -0,0 +1,203 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.util.JsonFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.policy.aai.AaiConstants;
+import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.cds.client.CdsProcessorGrpcClient;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.controlloop.actor.aai.AaiCustomQueryOperation;
+import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
+import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+/**
+ * Operation that uses gRPC to send request to CDS.
+ *
+ */
+@Getter
+public class GrpcOperation extends OperationPartial {
+
+    public static final String NAME = "gRPC";
+
+    private CdsProcessorGrpcClient client;
+
+    /**
+     * Configuration for this operation.
+     */
+    private final GrpcConfig config;
+
+    /**
+     * Constructs the object.
+     *
+     * @param params operation parameters
+     * @param config configuration for this operation
+     */
+    public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) {
+        super(params, config);
+        this.config = config;
+    }
+
+    /**
+     * If no timeout is specified, then it returns the operator's configured timeout.
+     */
+    @Override
+    protected long getTimeoutMs(Integer timeoutSec) {
+        return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
+    }
+
+    /**
+     * Ensures that A&AI customer query has been performed.
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+        ControlLoopOperationParams cqParams = params.toBuilder().actor(AaiConstants.ACTOR_NAME)
+                        .operation(AaiCustomQueryOperation.NAME).payload(null).retry(null).timeoutSec(null).build();
+
+        // run Custom Query and Guard, in parallel
+        return allOf(() -> params.getContext().obtain(AaiCqResponse.CONTEXT_KEY, cqParams), this::startGuardAsync);
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+
+        CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+        client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
+                        config.getCdsServerProperties());
+
+        ExecutionServiceInput request = constructRequest(params);
+        client.sendRequest(request);
+        return future;
+    }
+
+    /**
+     * Build the CDS ExecutionServiceInput request from the policy object and the AAI
+     * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
+     * item for Frankfurt release.
+     *
+     * @param params the control loop parameters specifying the onset, payload, etc.
+     * @return an ExecutionServiceInput instance.
+     */
+    public ExecutionServiceInput constructRequest(ControlLoopOperationParams params) {
+
+        // For the current operational TOSCA policy model (yaml) CBA name and version are
+        // embedded in the payload
+        // section, with the new policy type model being proposed in Frankfurt we will be
+        // able to move it out.
+        if (!validateCdsMandatoryParams(params)) {
+            throw new IllegalArgumentException("missing cds mandatory params -  " + params);
+        }
+        Map<String, String> payload = convertPayloadMap(params.getPayload());
+        String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
+        String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
+
+        // Retain only the payload by removing CBA name and version once they are
+        // extracted
+        // to be put in CDS request header.
+        payload.remove(CdsActorConstants.KEY_CBA_NAME);
+        payload.remove(CdsActorConstants.KEY_CBA_VERSION);
+
+        // Embed payload from policy to ConfigDeployRequest object, serialize and inject
+        // into grpc request.
+        String cbaActionName = params.getOperation();
+        CdsActionRequest request = new CdsActionRequest();
+        request.setPolicyPayload(payload);
+        request.setActionName(cbaActionName);
+        request.setResolutionKey(UUID.randomUUID().toString());
+
+        // Inject AAI properties into payload map. Offer flexibility to the usecase
+        // implementation to inject whatever AAI parameters are of interest to them.
+        // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
+        // needed by CDS.
+        request.setAaiProperties(params.getContext().getEnrichment());
+
+        // Inject any additional event parameters that may be present in the onset event
+        if (params.getContext().getEvent().getAdditionalEventParams() != null) {
+            request.setAdditionalEventParams(params.getContext().getEvent().getAdditionalEventParams());
+        }
+
+        Builder struct = Struct.newBuilder();
+        try {
+            String requestStr = request.generateCdsPayload();
+            Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
+                            "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
+            JsonFormat.parser().merge(requestStr, struct);
+        } catch (InvalidProtocolBufferException | CoderException e) {
+            throw new IllegalArgumentException("Failed to embed CDS payload string into the input request. blueprint({"
+                            + cbaName + "}:{" + cbaVersion + "}) for action({" + cbaActionName + "})", e);
+        }
+
+        // Build CDS gRPC request common-header
+        CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
+                        .setRequestId(params.getContext().getEvent().getRequestId().toString())
+                        .setSubRequestId(Integer.toString(0)).build();
+
+        // Build CDS gRPC request action-identifier
+        ActionIdentifiers actionIdentifiers =
+                        ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
+                                        .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
+
+        // Finally build & return the ExecutionServiceInput gRPC request object.
+        return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
+                        .setPayload(struct.build()).build();
+    }
+
+    private Map<String, String> convertPayloadMap(Map<String, Object> payload) {
+        Map<String, String> convertedPayload = new HashMap<>();
+        for (Entry<String, Object> entry : payload.entrySet()) {
+            convertedPayload.put(entry.getKey(), entry.getValue().toString());
+        }
+        return convertedPayload;
+    }
+
+    private boolean validateCdsMandatoryParams(ControlLoopOperationParams params) {
+        if (params == null || params.getPayload() == null) {
+            return false;
+        }
+        Map<String, Object> payload = params.getPayload();
+        if (payload.get(CdsActorConstants.KEY_CBA_NAME) == null
+                        || payload.get(CdsActorConstants.KEY_CBA_VERSION) == null) {
+            return false;
+        }
+        return !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_NAME).toString())
+                        && !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_VERSION).toString())
+                        && !Strings.isNullOrEmpty(params.getOperation());
+    }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java
new file mode 100644 (file)
index 0000000..cc38d72
--- /dev/null
@@ -0,0 +1,106 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.Map;
+import lombok.Getter;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperationMaker;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperatorPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+/**
+ * Operator that uses gRPC. The operator's parameters must be a
+ * {@link CdsServerProperties}.
+ */
+@Getter
+public class GrpcOperator extends OperatorPartial {
+
+    /**
+     * Function to make an operation.
+     */
+    private final OperationMaker<GrpcConfig, GrpcOperation> operationMaker;
+
+    /**
+     * Current configuration. This is set by {@link #doConfigure(Map)}.
+     */
+    private GrpcConfig currentConfig;
+
+    /**
+     * Constructs the object.
+     *
+     * @param actorName name of the actor with which this operator is associated
+     * @param name operation name
+     */
+    public GrpcOperator(String actorName, String name) {
+        this(actorName, name, null);
+    }
+
+    /**
+     * Constructs the object.
+     *
+     * @param actorName name of the actor with which this operator is associated
+     * @param name operation name
+     * @param operationMaker function to make an operation
+     */
+    public GrpcOperator(String actorName, String name, OperationMaker<GrpcConfig, GrpcOperation> operationMaker) {
+        super(actorName, name);
+        this.operationMaker = operationMaker;
+    }
+
+    /**
+     * Translates the parameters to an {@link CdsServerProperties} and then extracts the
+     * relevant values.
+     */
+    @Override
+    protected void doConfigure(Map<String, Object> parameters) {
+        currentConfig = makeConfiguration(parameters);
+    }
+
+    /**
+     * Makes a new configuration using the specified parameters.
+     *
+     * @param parameters operator parameters
+     * @return a new configuration
+     */
+    protected GrpcConfig makeConfiguration(Map<String, Object> parameters) {
+        CdsServerProperties params = Util.translate(getFullName(), parameters, CdsServerProperties.class);
+        ValidationResult result = params.validate();
+        if (!result.isValid()) {
+            throw new ParameterValidationRuntimeException("invalid parameters", result);
+        }
+        return new GrpcConfig(getBlockingExecutor(), params);
+    }
+
+    /**
+     * {@inheritDoc}.
+     */
+    @Override
+    public Operation buildOperation(ControlLoopOperationParams params) {
+        if (operationMaker == null) {
+            throw new UnsupportedOperationException("cannot make operation for " + getFullName());
+        }
+        verifyRunning();
+        return operationMaker.apply(params, currentConfig);
+    }
+}