2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 Bell Canada. All rights reserved.
4 * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.controlloop.actor.cds;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.Operator;
52 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
53 import org.onap.policy.controlloop.policy.Policy;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * CDS is an unusual actor in that it uses a single, generic operator to initiate all
59 * operation types. The action taken is always the same, only the operation name changes.
61 public class CdsActor extends ActorImpl {
62 public static final String NAME = CdsActorConstants.CDS_ACTOR;
64 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActor.class);
67 * Constructs the object.
70 super(CdsActorConstants.CDS_ACTOR);
72 addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
76 public Operator getOperator(String name) {
78 * All operations are managed by the same operator, regardless of the name.
80 return super.getOperator(GrpcOperation.NAME);
83 // TODO old code: remove lines down to **HERE**
89 public String actor() {
90 return CdsActorConstants.CDS_ACTOR;
94 * {@inheritDoc}. Note: This is a placeholder for now.
97 public List<String> recipes() {
98 return new ArrayList<>();
102 * {@inheritDoc}. Note: This is a placeholder for now.
105 public List<String> recipeTargets(final String recipe) {
106 return new ArrayList<>();
110 * {@inheritDoc}. Note: This is a placeholder for now.
113 public List<String> recipePayloads(final String recipe) {
114 return new ArrayList<>();
118 * Build the CDS ExecutionServiceInput request from the policy object and the AAI
119 * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
120 * item for Frankfurt release.
122 * @param onset the event that is reporting the alert for policy to perform an action.
123 * @param operation the control loop operation specifying the actor, operation,
125 * @param policy the policy specified from the yaml generated by CLAMP or through
127 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
128 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
129 * object is returned.
131 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
132 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
134 // For the current operational TOSCA policy model (yaml) CBA name and version are
135 // embedded in the payload
136 // section, with the new policy type model being proposed in Frankfurt we will be
137 // able to move it out.
138 Map<String, String> payload = policy.getPayload();
139 if (!validateCdsMandatoryParams(policy)) {
140 return Optional.empty();
142 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
143 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
145 // Retain only the payload by removing CBA name and version once they are
147 // to be put in CDS request header.
148 payload.remove(CdsActorConstants.KEY_CBA_NAME);
149 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
151 // Embed payload from policy to ConfigDeployRequest object, serialize and inject
152 // into grpc request.
153 String cbaActionName = policy.getRecipe();
154 CdsActionRequest request = new CdsActionRequest();
155 request.setPolicyPayload(payload);
156 request.setActionName(cbaActionName);
157 request.setResolutionKey(UUID.randomUUID().toString());
159 // Inject AAI properties into payload map. Offer flexibility to the usecase
160 // implementation to inject whatever AAI parameters are of interest to them.
161 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
163 request.setAaiProperties(aaiParams);
165 // Inject any additional event parameters that may be present in the onset event
166 if (onset.getAdditionalEventParams() != null) {
167 request.setAdditionalEventParams(onset.getAdditionalEventParams());
170 Builder struct = Struct.newBuilder();
172 String requestStr = request.generateCdsPayload();
173 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
174 "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
175 JsonFormat.parser().merge(requestStr, struct);
176 } catch (InvalidProtocolBufferException | CoderException e) {
177 LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
178 cbaName, cbaVersion, cbaActionName, e);
179 return Optional.empty();
182 // Build CDS gRPC request common-header
183 CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
184 .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
187 // Build CDS gRPC request action-identifier
188 ActionIdentifiers actionIdentifiers =
189 ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
190 .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
192 // Finally build the ExecutionServiceInput gRPC request object.
193 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
194 .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
195 return Optional.of(executionServiceInput);
198 private boolean validateCdsMandatoryParams(Policy policy) {
199 if (policy == null || policy.getPayload() == null) {
202 Map<String, String> payload = policy.getPayload();
203 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
204 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
205 String cbaActionName = policy.getRecipe();
206 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
207 && !Strings.isNullOrEmpty(cbaActionName);
210 public class CdsActorServiceManager implements CdsProcessorListener {
212 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
218 public void onMessage(final ExecutionServiceOutput message) {
219 LOGGER.info("Received notification from CDS: {}", message);
220 EventType eventType = message.getStatus().getEventType();
222 case EVENT_COMPONENT_FAILURE:
223 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
225 case EVENT_COMPONENT_PROCESSING:
226 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
228 case EVENT_COMPONENT_EXECUTED:
229 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
232 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
241 public void onError(final Throwable throwable) {
242 Status status = Status.fromThrowable(throwable);
243 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
244 LOGGER.error("Failed processing blueprint {}", status, throwable);
248 * Send gRPC request to CDS to execute the blueprint.
250 * @param cdsClient CDS grpc client object.
251 * @param cdsProps CDS properties.
252 * @param executionServiceInput a valid CDS grpc request object.
253 * @return the cds response.
255 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
256 ExecutionServiceInput executionServiceInput) {
258 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
259 // TO-DO: Handle requests asynchronously once the callback support is
261 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
262 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
264 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
266 LOGGER.info("CDS status response {}", getCdsStatus());
267 } catch (InterruptedException ex) {
268 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
269 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
270 Thread.currentThread().interrupt();
272 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
274 CdsResponse response = new CdsResponse();
275 response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
276 ? executionServiceInput.getCommonHeader().getRequestId()
278 response.setStatus(this.getCdsStatus());
282 String getCdsStatus() {
283 return cdsStatus.get();