X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2Factor.cds%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factor%2Fcds%2FCdsActor.java;h=2a49c812dfa1cf7ff374816c9f8fe6d608265fac;hb=c88676ec64c4c870252502bc1cfaa8990c8fd964;hp=d630d18aee55c586aa639471b5f14af7d3ea4016;hpb=e4e7d15db6d2f79658e3a5f9e8326ea092afcfab;p=policy%2Fmodels.git diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActor.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActor.java index d630d18ae..2a49c812d 100644 --- a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActor.java +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActor.java @@ -19,40 +19,9 @@ package org.onap.policy.controlloop.actor.cds; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Struct; -import com.google.protobuf.Struct.Builder; -import com.google.protobuf.util.JsonFormat; -import io.grpc.Status; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; -import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; -import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; -import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; -import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; -import org.onap.policy.cds.CdsResponse; -import org.onap.policy.cds.api.CdsProcessorListener; -import org.onap.policy.cds.client.CdsProcessorGrpcClient; -import org.onap.policy.cds.properties.CdsServerProperties; -import org.onap.policy.common.utils.coder.CoderException; -import org.onap.policy.controlloop.ControlLoopOperation; -import org.onap.policy.controlloop.VirtualControlLoopEvent; import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants; -import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest; import org.onap.policy.controlloop.actorserviceprovider.Operator; import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl; -import org.onap.policy.controlloop.policy.Policy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * CDS is an unusual actor in that it uses a single, generic operator to initiate all @@ -61,8 +30,6 @@ import org.slf4j.LoggerFactory; public class CdsActor extends ActorImpl { public static final String NAME = CdsActorConstants.CDS_ACTOR; - private static final Logger LOGGER = LoggerFactory.getLogger(CdsActor.class); - /** * Constructs the object. */ @@ -79,210 +46,4 @@ public class CdsActor extends ActorImpl { */ return super.getOperator(GrpcOperation.NAME); } - - // TODO old code: remove lines down to **HERE** - - /** - * {@inheritDoc}. - */ - @Override - public String actor() { - return CdsActorConstants.CDS_ACTOR; - } - - /** - * {@inheritDoc}. Note: This is a placeholder for now. - */ - @Override - public List recipes() { - return new ArrayList<>(); - } - - /** - * {@inheritDoc}. Note: This is a placeholder for now. - */ - @Override - public List recipeTargets(final String recipe) { - return new ArrayList<>(); - } - - /** - * {@inheritDoc}. Note: This is a placeholder for now. - */ - @Override - public List recipePayloads(final String recipe) { - return new ArrayList<>(); - } - - /** - * Build the CDS ExecutionServiceInput request from the policy object and the AAI - * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD - * item for Frankfurt release. - * - * @param onset the event that is reporting the alert for policy to perform an action. - * @param operation the control loop operation specifying the actor, operation, - * target, etc. - * @param policy the policy specified from the yaml generated by CLAMP or through - * Policy API. - * @param aaiParams Map of enriched AAI attributes in node.attribute notation. - * @return an Optional ExecutionServiceInput instance if valid else an Optional empty - * object is returned. - */ - public Optional constructRequest(VirtualControlLoopEvent onset, - ControlLoopOperation operation, Policy policy, Map aaiParams) { - - // For the current operational TOSCA policy model (yaml) CBA name and version are - // embedded in the payload - // section, with the new policy type model being proposed in Frankfurt we will be - // able to move it out. - Map payload = policy.getPayload(); - if (!validateCdsMandatoryParams(policy)) { - return Optional.empty(); - } - String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); - String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); - - // Retain only the payload by removing CBA name and version once they are - // extracted - // to be put in CDS request header. - payload.remove(CdsActorConstants.KEY_CBA_NAME); - payload.remove(CdsActorConstants.KEY_CBA_VERSION); - - // Embed payload from policy to ConfigDeployRequest object, serialize and inject - // into grpc request. - String cbaActionName = policy.getRecipe(); - CdsActionRequest request = new CdsActionRequest(); - request.setPolicyPayload(payload); - request.setActionName(cbaActionName); - request.setResolutionKey(UUID.randomUUID().toString()); - - // Inject AAI properties into payload map. Offer flexibility to the usecase - // implementation to inject whatever AAI parameters are of interest to them. - // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as - // needed by CDS. - request.setAaiProperties(aaiParams); - - // Inject any additional event parameters that may be present in the onset event - if (onset.getAdditionalEventParams() != null) { - request.setAdditionalEventParams(onset.getAdditionalEventParams()); - } - - Builder struct = Struct.newBuilder(); - try { - String requestStr = request.generateCdsPayload(); - Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), - "Unable to build " + "config-deploy-request from payload parameters: {}", payload); - JsonFormat.parser().merge(requestStr, struct); - } catch (InvalidProtocolBufferException | CoderException e) { - LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})", - cbaName, cbaVersion, cbaActionName, e); - return Optional.empty(); - } - - // Build CDS gRPC request common-header - CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID) - .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId()) - .build(); - - // Build CDS gRPC request action-identifier - ActionIdentifiers actionIdentifiers = - ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion) - .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build(); - - // Finally build the ExecutionServiceInput gRPC request object. - ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader) - .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build(); - return Optional.of(executionServiceInput); - } - - private boolean validateCdsMandatoryParams(Policy policy) { - if (policy == null || policy.getPayload() == null) { - return false; - } - Map payload = policy.getPayload(); - String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); - String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); - String cbaActionName = policy.getRecipe(); - return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) - && !Strings.isNullOrEmpty(cbaActionName); - } - - public class CdsActorServiceManager implements CdsProcessorListener { - - private final AtomicReference cdsStatus = new AtomicReference<>(); - - /** - * {@inheritDoc}. - */ - @Override - public void onMessage(final ExecutionServiceOutput message) { - LOGGER.info("Received notification from CDS: {}", message); - EventType eventType = message.getStatus().getEventType(); - switch (eventType) { - case EVENT_COMPONENT_FAILURE: - cdsStatus.compareAndSet(null, CdsActorConstants.FAILED); - break; - case EVENT_COMPONENT_PROCESSING: - cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING); - break; - case EVENT_COMPONENT_EXECUTED: - cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS); - break; - default: - cdsStatus.compareAndSet(null, CdsActorConstants.FAILED); - break; - } - } - - /** - * {@inheritDoc}. - */ - @Override - public void onError(final Throwable throwable) { - Status status = Status.fromThrowable(throwable); - cdsStatus.compareAndSet(null, CdsActorConstants.ERROR); - LOGGER.error("Failed processing blueprint {}", status, throwable); - } - - /** - * Send gRPC request to CDS to execute the blueprint. - * - * @param cdsClient CDS grpc client object. - * @param cdsProps CDS properties. - * @param executionServiceInput a valid CDS grpc request object. - * @return the cds response. - */ - public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps, - ExecutionServiceInput executionServiceInput) { - try { - LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput); - // TO-DO: Handle requests asynchronously once the callback support is - // added to actors. - CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput); - boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS); - if (!status) { - cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT); - } - LOGGER.info("CDS status response {}", getCdsStatus()); - } catch (InterruptedException ex) { - LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex); - cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED); - Thread.currentThread().interrupt(); - } - LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus()); - - CdsResponse response = new CdsResponse(); - response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null - ? executionServiceInput.getCommonHeader().getRequestId() - : null); - response.setStatus(this.getCdsStatus()); - return response; - } - - String getCdsStatus() { - return cdsStatus.get(); - } - } - - // **HERE** }