2  * ============LICENSE_START=======================================================
 
   3  * Copyright (C) 2019 Bell Canada. All rights reserved.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  16  * ============LICENSE_END=========================================================
 
  19 package org.onap.policy.controlloop.actor.cds;
 
  21 import com.google.common.base.Preconditions;
 
  22 import com.google.common.base.Strings;
 
  23 import com.google.protobuf.InvalidProtocolBufferException;
 
  24 import com.google.protobuf.Struct;
 
  25 import com.google.protobuf.Struct.Builder;
 
  26 import com.google.protobuf.util.JsonFormat;
 
  27 import io.grpc.Status;
 
  28 import java.util.ArrayList;
 
  29 import java.util.List;
 
  31 import java.util.Optional;
 
  32 import java.util.UUID;
 
  33 import java.util.concurrent.CountDownLatch;
 
  34 import java.util.concurrent.TimeUnit;
 
  35 import java.util.concurrent.atomic.AtomicReference;
 
  36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 
  37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
 
  38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
 
  39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
 
  40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
 
  41 import org.onap.policy.cds.CdsResponse;
 
  42 import org.onap.policy.cds.api.CdsProcessorListener;
 
  43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
 
  44 import org.onap.policy.cds.properties.CdsServerProperties;
 
  45 import org.onap.policy.common.utils.coder.CoderException;
 
  46 import org.onap.policy.controlloop.ControlLoopOperation;
 
  47 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 
  48 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
 
  49 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
 
  50 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 
  51 import org.onap.policy.controlloop.policy.Policy;
 
  52 import org.slf4j.Logger;
 
  53 import org.slf4j.LoggerFactory;
 
  56  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
 
  58 public class CdsActorServiceProvider implements Actor {
 
  60     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
 
  66     public String actor() {
 
  67         return CdsActorConstants.CDS_ACTOR;
 
  71      * {@inheritDoc}. Note: This is a placeholder for now.
 
  74     public List<String> recipes() {
 
  75         return new ArrayList<>();
 
  79      * {@inheritDoc}. Note: This is a placeholder for now.
 
  82     public List<String> recipeTargets(final String recipe) {
 
  83         return new ArrayList<>();
 
  87      * {@inheritDoc}. Note: This is a placeholder for now.
 
  90     public List<String> recipePayloads(final String recipe) {
 
  91         return new ArrayList<>();
 
  95      * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
 
  96      * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
 
  98      * @param onset the event that is reporting the alert for policy to perform an action.
 
  99      * @param operation the control loop operation specifying the actor, operation, target, etc.
 
 100      * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
 
 101      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
 
 102      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
 
 104     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
 
 105         ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
 
 107         // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
 
 108         // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
 
 109         Map<String, String> payload = policy.getPayload();
 
 110         if (!validateCdsMandatoryParams(policy)) {
 
 111             return Optional.empty();
 
 113         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
 
 114         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
 
 116         // Retain only the payload by removing CBA name and version once they are extracted
 
 117         // to be put in CDS request header.
 
 118         payload.remove(CdsActorConstants.KEY_CBA_NAME);
 
 119         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
 
 121         // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
 
 122         String cbaActionName = policy.getRecipe();
 
 123         CdsActionRequest request = new CdsActionRequest();
 
 124         request.setPolicyPayload(payload);
 
 125         request.setActionName(cbaActionName);
 
 126         request.setResolutionKey(UUID.randomUUID().toString());
 
 128         // Inject AAI properties into payload map. Offer flexibility to the usecase
 
 129         // implementation to inject whatever AAI parameters are of interest to them.
 
 130         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
 
 131         request.setAaiProperties(aaiParams);
 
 133         // Inject any additional event parameters that may be present in the onset event
 
 134         if (onset.getAdditionalEventParams() != null) {
 
 135             request.setAdditionalEventParams(onset.getAdditionalEventParams());
 
 138         Builder struct = Struct.newBuilder();
 
 140             String requestStr = request.generateCdsPayload();
 
 141             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
 
 142                 + "config-deploy-request from payload parameters: {}", payload);
 
 143             JsonFormat.parser().merge(requestStr, struct);
 
 144         } catch (InvalidProtocolBufferException | CoderException e) {
 
 145             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
 
 146                     cbaName, cbaVersion, cbaActionName, e);
 
 147             return Optional.empty();
 
 150         // Build CDS gRPC request common-header
 
 151         CommonHeader commonHeader = CommonHeader.newBuilder()
 
 152             .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
 
 153             .setRequestId(onset.getRequestId().toString())
 
 154             .setSubRequestId(operation.getSubRequestId())
 
 157         // Build CDS gRPC request action-identifier
 
 158         ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
 
 159             .setBlueprintName(cbaName)
 
 160             .setBlueprintVersion(cbaVersion)
 
 161             .setActionName(cbaActionName)
 
 162             .setMode(CdsActorConstants.CDS_MODE)
 
 165         // Finally build the ExecutionServiceInput gRPC request object.
 
 166         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
 
 167             .setCommonHeader(commonHeader)
 
 168             .setActionIdentifiers(actionIdentifiers)
 
 169             .setPayload(struct.build())
 
 171         return Optional.of(executionServiceInput);
 
 174     private boolean validateCdsMandatoryParams(Policy policy) {
 
 175         if (policy == null || policy.getPayload() == null) {
 
 178         Map<String, String> payload = policy.getPayload();
 
 179         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
 
 180         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
 
 181         String cbaActionName = policy.getRecipe();
 
 182         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
 
 183             .isNullOrEmpty(cbaActionName);
 
 186     public class CdsActorServiceManager implements CdsProcessorListener {
 
 188         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
 
 194         public void onMessage(final ExecutionServiceOutput message) {
 
 195             LOGGER.info("Received notification from CDS: {}", message);
 
 196             EventType eventType = message.getStatus().getEventType();
 
 198                 case EVENT_COMPONENT_FAILURE:
 
 199                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
 
 201                 case EVENT_COMPONENT_PROCESSING:
 
 202                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
 
 204                 case EVENT_COMPONENT_EXECUTED:
 
 205                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
 
 208                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
 
 217         public void onError(final Throwable throwable) {
 
 218             Status status = Status.fromThrowable(throwable);
 
 219             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
 
 220             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
 
 224          * Send gRPC request to CDS to execute the blueprint.
 
 226          * @param cdsClient CDS grpc client object.
 
 227          * @param cdsProps CDS properties.
 
 228          * @param executionServiceInput a valid CDS grpc request object.
 
 229          * @return the cds response.
 
 231         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
 
 232                                             ExecutionServiceInput executionServiceInput) {
 
 234                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
 
 235                 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
 
 236                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
 
 237                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
 
 239                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
 
 241                 LOGGER.info("CDS status response {}", getCdsStatus());
 
 242             } catch (InterruptedException ex) {
 
 243                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
 
 244                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
 
 245                 Thread.currentThread().interrupt();
 
 247             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
 
 249             CdsResponse response = new CdsResponse();
 
 250             response.setRequestId(
 
 251                     executionServiceInput != null && executionServiceInput.getCommonHeader() != null
 
 252                         ? executionServiceInput.getCommonHeader().getRequestId() : null);
 
 253             response.setStatus(this.getCdsStatus());
 
 257         String getCdsStatus() {
 
 258             return cdsStatus.get();