2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.controlloop.actor.cds;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
52 import org.onap.policy.controlloop.policy.Policy;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
59 public class CdsActorServiceProvider extends ActorImpl {
61 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
63 public CdsActorServiceProvider() {
64 super(CdsActorConstants.CDS_ACTOR);
71 public String actor() {
72 return CdsActorConstants.CDS_ACTOR;
76 * {@inheritDoc}. Note: This is a placeholder for now.
79 public List<String> recipes() {
80 return new ArrayList<>();
84 * {@inheritDoc}. Note: This is a placeholder for now.
87 public List<String> recipeTargets(final String recipe) {
88 return new ArrayList<>();
92 * {@inheritDoc}. Note: This is a placeholder for now.
95 public List<String> recipePayloads(final String recipe) {
96 return new ArrayList<>();
100 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
101 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
103 * @param onset the event that is reporting the alert for policy to perform an action.
104 * @param operation the control loop operation specifying the actor, operation, target, etc.
105 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
106 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
107 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
109 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
110 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
112 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
113 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
114 Map<String, String> payload = policy.getPayload();
115 if (!validateCdsMandatoryParams(policy)) {
116 return Optional.empty();
118 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
119 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
121 // Retain only the payload by removing CBA name and version once they are extracted
122 // to be put in CDS request header.
123 payload.remove(CdsActorConstants.KEY_CBA_NAME);
124 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
126 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
127 String cbaActionName = policy.getRecipe();
128 CdsActionRequest request = new CdsActionRequest();
129 request.setPolicyPayload(payload);
130 request.setActionName(cbaActionName);
131 request.setResolutionKey(UUID.randomUUID().toString());
133 // Inject AAI properties into payload map. Offer flexibility to the usecase
134 // implementation to inject whatever AAI parameters are of interest to them.
135 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
136 request.setAaiProperties(aaiParams);
138 // Inject any additional event parameters that may be present in the onset event
139 if (onset.getAdditionalEventParams() != null) {
140 request.setAdditionalEventParams(onset.getAdditionalEventParams());
143 Builder struct = Struct.newBuilder();
145 String requestStr = request.generateCdsPayload();
146 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
147 + "config-deploy-request from payload parameters: {}", payload);
148 JsonFormat.parser().merge(requestStr, struct);
149 } catch (InvalidProtocolBufferException | CoderException e) {
150 LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
151 cbaName, cbaVersion, cbaActionName, e);
152 return Optional.empty();
155 // Build CDS gRPC request common-header
156 CommonHeader commonHeader = CommonHeader.newBuilder()
157 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
158 .setRequestId(onset.getRequestId().toString())
159 .setSubRequestId(operation.getSubRequestId())
162 // Build CDS gRPC request action-identifier
163 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
164 .setBlueprintName(cbaName)
165 .setBlueprintVersion(cbaVersion)
166 .setActionName(cbaActionName)
167 .setMode(CdsActorConstants.CDS_MODE)
170 // Finally build the ExecutionServiceInput gRPC request object.
171 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
172 .setCommonHeader(commonHeader)
173 .setActionIdentifiers(actionIdentifiers)
174 .setPayload(struct.build())
176 return Optional.of(executionServiceInput);
179 private boolean validateCdsMandatoryParams(Policy policy) {
180 if (policy == null || policy.getPayload() == null) {
183 Map<String, String> payload = policy.getPayload();
184 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
185 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
186 String cbaActionName = policy.getRecipe();
187 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
188 .isNullOrEmpty(cbaActionName);
191 public class CdsActorServiceManager implements CdsProcessorListener {
193 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
199 public void onMessage(final ExecutionServiceOutput message) {
200 LOGGER.info("Received notification from CDS: {}", message);
201 EventType eventType = message.getStatus().getEventType();
203 case EVENT_COMPONENT_FAILURE:
204 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
206 case EVENT_COMPONENT_PROCESSING:
207 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
209 case EVENT_COMPONENT_EXECUTED:
210 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
213 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
222 public void onError(final Throwable throwable) {
223 Status status = Status.fromThrowable(throwable);
224 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
225 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
229 * Send gRPC request to CDS to execute the blueprint.
231 * @param cdsClient CDS grpc client object.
232 * @param cdsProps CDS properties.
233 * @param executionServiceInput a valid CDS grpc request object.
234 * @return the cds response.
236 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
237 ExecutionServiceInput executionServiceInput) {
239 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
240 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
241 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
242 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
244 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
246 LOGGER.info("CDS status response {}", getCdsStatus());
247 } catch (InterruptedException ex) {
248 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
249 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
250 Thread.currentThread().interrupt();
252 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
254 CdsResponse response = new CdsResponse();
255 response.setRequestId(
256 executionServiceInput != null && executionServiceInput.getCommonHeader() != null
257 ? executionServiceInput.getCommonHeader().getRequestId() : null);
258 response.setStatus(this.getCdsStatus());
262 String getCdsStatus() {
263 return cdsStatus.get();