/*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Bell Canada. All rights reserved. * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.controlloop.actor.cds; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Struct; import com.google.protobuf.Struct.Builder; import com.google.protobuf.util.JsonFormat; import io.grpc.Status; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; import org.onap.policy.cds.CdsResponse; import org.onap.policy.cds.api.CdsProcessorListener; import org.onap.policy.cds.client.CdsProcessorGrpcClient; import org.onap.policy.cds.properties.CdsServerProperties; import org.onap.policy.common.utils.coder.CoderException; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants; import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest; import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl; import org.onap.policy.controlloop.policy.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto * release. */ public class CdsActorServiceProvider extends ActorImpl { private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class); /** * Constructs the object. */ public CdsActorServiceProvider() { super(CdsActorConstants.CDS_ACTOR); addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new)); } // TODO old code: remove lines down to **HERE** /** * {@inheritDoc}. */ @Override public String actor() { return CdsActorConstants.CDS_ACTOR; } /** * {@inheritDoc}. Note: This is a placeholder for now. */ @Override public List recipes() { return new ArrayList<>(); } /** * {@inheritDoc}. Note: This is a placeholder for now. */ @Override public List recipeTargets(final String recipe) { return new ArrayList<>(); } /** * {@inheritDoc}. Note: This is a placeholder for now. */ @Override public List recipePayloads(final String recipe) { return new ArrayList<>(); } /** * Build the CDS ExecutionServiceInput request from the policy object and the AAI * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD * item for Frankfurt release. * * @param onset the event that is reporting the alert for policy to perform an action. * @param operation the control loop operation specifying the actor, operation, * target, etc. * @param policy the policy specified from the yaml generated by CLAMP or through * Policy API. * @param aaiParams Map of enriched AAI attributes in node.attribute notation. * @return an Optional ExecutionServiceInput instance if valid else an Optional empty * object is returned. */ public Optional constructRequest(VirtualControlLoopEvent onset, ControlLoopOperation operation, Policy policy, Map aaiParams) { // For the current operational TOSCA policy model (yaml) CBA name and version are // embedded in the payload // section, with the new policy type model being proposed in Frankfurt we will be // able to move it out. Map payload = policy.getPayload(); if (!validateCdsMandatoryParams(policy)) { return Optional.empty(); } String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); // Retain only the payload by removing CBA name and version once they are // extracted // to be put in CDS request header. payload.remove(CdsActorConstants.KEY_CBA_NAME); payload.remove(CdsActorConstants.KEY_CBA_VERSION); // Embed payload from policy to ConfigDeployRequest object, serialize and inject // into grpc request. String cbaActionName = policy.getRecipe(); CdsActionRequest request = new CdsActionRequest(); request.setPolicyPayload(payload); request.setActionName(cbaActionName); request.setResolutionKey(UUID.randomUUID().toString()); // Inject AAI properties into payload map. Offer flexibility to the usecase // implementation to inject whatever AAI parameters are of interest to them. // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as // needed by CDS. request.setAaiProperties(aaiParams); // Inject any additional event parameters that may be present in the onset event if (onset.getAdditionalEventParams() != null) { request.setAdditionalEventParams(onset.getAdditionalEventParams()); } Builder struct = Struct.newBuilder(); try { String requestStr = request.generateCdsPayload(); Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build " + "config-deploy-request from payload parameters: {}", payload); JsonFormat.parser().merge(requestStr, struct); } catch (InvalidProtocolBufferException | CoderException e) { LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})", cbaName, cbaVersion, cbaActionName, e); return Optional.empty(); } // Build CDS gRPC request common-header CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID) .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId()) .build(); // Build CDS gRPC request action-identifier ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion) .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build(); // Finally build the ExecutionServiceInput gRPC request object. ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader) .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build(); return Optional.of(executionServiceInput); } private boolean validateCdsMandatoryParams(Policy policy) { if (policy == null || policy.getPayload() == null) { return false; } Map payload = policy.getPayload(); String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); String cbaActionName = policy.getRecipe(); return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings.isNullOrEmpty(cbaActionName); } public class CdsActorServiceManager implements CdsProcessorListener { private final AtomicReference cdsStatus = new AtomicReference<>(); /** * {@inheritDoc}. */ @Override public void onMessage(final ExecutionServiceOutput message) { LOGGER.info("Received notification from CDS: {}", message); EventType eventType = message.getStatus().getEventType(); switch (eventType) { case EVENT_COMPONENT_FAILURE: cdsStatus.compareAndSet(null, CdsActorConstants.FAILED); break; case EVENT_COMPONENT_PROCESSING: cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING); break; case EVENT_COMPONENT_EXECUTED: cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS); break; default: cdsStatus.compareAndSet(null, CdsActorConstants.FAILED); break; } } /** * {@inheritDoc}. */ @Override public void onError(final Throwable throwable) { Status status = Status.fromThrowable(throwable); cdsStatus.compareAndSet(null, CdsActorConstants.ERROR); LOGGER.error("Failed processing blueprint {} {}", status, throwable); } /** * Send gRPC request to CDS to execute the blueprint. * * @param cdsClient CDS grpc client object. * @param cdsProps CDS properties. * @param executionServiceInput a valid CDS grpc request object. * @return the cds response. */ public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps, ExecutionServiceInput executionServiceInput) { try { LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput); // TO-DO: Handle requests asynchronously once the callback support is // added to actors. CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput); boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS); if (!status) { cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT); } LOGGER.info("CDS status response {}", getCdsStatus()); } catch (InterruptedException ex) { LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex); cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED); Thread.currentThread().interrupt(); } LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus()); CdsResponse response = new CdsResponse(); response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null ? executionServiceInput.getCommonHeader().getRequestId() : null); response.setStatus(this.getCdsStatus()); return response; } String getCdsStatus() { return cdsStatus.get(); } } // **HERE** }