2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.controlloop.actor.cds;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.CdsResponse;
42 import org.onap.policy.cds.api.CdsProcessorListener;
43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
44 import org.onap.policy.cds.properties.CdsServerProperties;
45 import org.onap.policy.controlloop.ControlLoopOperation;
46 import org.onap.policy.controlloop.VirtualControlLoopEvent;
47 import org.onap.policy.controlloop.actor.cds.beans.CdsActionRequest;
48 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
49 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
50 import org.onap.policy.controlloop.policy.Policy;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
57 public class CdsActorServiceProvider implements Actor {
59 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
65 public String actor() {
66 return CdsActorConstants.CDS_ACTOR;
70 * {@inheritDoc}. Note: This is a placeholder for now.
73 public List<String> recipes() {
74 return new ArrayList<>();
78 * {@inheritDoc}. Note: This is a placeholder for now.
81 public List<String> recipeTargets(final String recipe) {
82 return new ArrayList<>();
86 * {@inheritDoc}. Note: This is a placeholder for now.
89 public List<String> recipePayloads(final String recipe) {
90 return new ArrayList<>();
94 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
95 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
97 * @param onset the event that is reporting the alert for policy to perform an action.
98 * @param operation the control loop operation specifying the actor, operation, target, etc.
99 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
100 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
101 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
103 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
104 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
106 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
107 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
108 Map<String, String> payload = policy.getPayload();
109 if (!validateCdsMandatoryParams(policy)) {
110 return Optional.empty();
112 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
113 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
114 String cbaActionName = policy.getRecipe();
116 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
117 CdsActionRequest request = new CdsActionRequest();
118 request.setConfigDeployProperties(payload);
119 request.setActionName(cbaActionName);
120 request.setResolutionKey(UUID.randomUUID().toString());
122 // Inject AAI properties into payload map. Offer flexibility to the usecase
123 // implementation to inject whatever AAI parameters are of interest to them.
124 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
125 request.setAaiProperties(aaiParams);
127 Builder struct = Struct.newBuilder();
129 String requestStr = request.toString();
130 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
131 + "config-deploy-request from payload parameters: {}", payload);
132 JsonFormat.parser().merge(requestStr, struct);
133 } catch (InvalidProtocolBufferException e) {
134 LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
136 return Optional.empty();
139 // Build CDS gRPC request common-header
140 CommonHeader commonHeader = CommonHeader.newBuilder()
141 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
142 .setRequestId(onset.getRequestId().toString())
143 .setSubRequestId(operation.getSubRequestId())
146 // Build CDS gRPC request action-identifier
147 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
148 .setBlueprintName(cbaName)
149 .setBlueprintVersion(cbaVersion)
150 .setActionName(cbaActionName)
151 .setMode(CdsActorConstants.CDS_MODE)
154 // Finally build the ExecutionServiceInput gRPC request object.
155 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
156 .setCommonHeader(commonHeader)
157 .setActionIdentifiers(actionIdentifiers)
158 .setPayload(struct.build())
160 return Optional.of(executionServiceInput);
163 private boolean validateCdsMandatoryParams(Policy policy) {
164 if (policy == null || policy.getPayload() == null) {
167 Map<String, String> payload = policy.getPayload();
168 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
169 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
170 String cbaActionName = policy.getRecipe();
171 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
172 .isNullOrEmpty(cbaActionName);
175 public class CdsActorServiceManager implements CdsProcessorListener {
177 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
183 public void onMessage(final ExecutionServiceOutput message) {
184 LOGGER.info("Received notification from CDS: {}", message);
185 EventType eventType = message.getStatus().getEventType();
187 case EVENT_COMPONENT_FAILURE:
188 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
190 case EVENT_COMPONENT_PROCESSING:
191 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
193 case EVENT_COMPONENT_EXECUTED:
194 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
197 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
206 public void onError(final Throwable throwable) {
207 Status status = Status.fromThrowable(throwable);
208 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
209 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
213 * Send gRPC request to CDS to execute the blueprint.
215 * @param cdsClient CDS grpc client object.
216 * @param cdsProps CDS properties.
217 * @param executionServiceInput a valid CDS grpc request object.
218 * @return the cds response.
220 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
221 ExecutionServiceInput executionServiceInput) {
223 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
224 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
225 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
226 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
228 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
230 LOGGER.info("CDS status response {}", getCdsStatus());
231 } catch (InterruptedException ex) {
232 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
233 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
234 Thread.currentThread().interrupt();
236 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
238 CdsResponse response = new CdsResponse();
239 response.setRequestId(
240 executionServiceInput != null && executionServiceInput.getCommonHeader() != null
241 ? executionServiceInput.getCommonHeader().getRequestId() : null);
242 response.setStatus(this.getCdsStatus());
246 String getCdsStatus() {
247 return cdsStatus.get();