2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.controlloop.actor.cds;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.CdsResponse;
42 import org.onap.policy.cds.api.CdsProcessorListener;
43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
44 import org.onap.policy.cds.properties.CdsServerProperties;
45 import org.onap.policy.controlloop.ControlLoopOperation;
46 import org.onap.policy.controlloop.VirtualControlLoopEvent;
47 import org.onap.policy.controlloop.actor.cds.beans.CdsActionRequest;
48 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
49 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
50 import org.onap.policy.controlloop.policy.Policy;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
57 public class CdsActorServiceProvider implements Actor {
59 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
65 public String actor() {
66 return CdsActorConstants.CDS_ACTOR;
70 * {@inheritDoc}. Note: This is a placeholder for now.
73 public List<String> recipes() {
74 return new ArrayList<>();
78 * {@inheritDoc}. Note: This is a placeholder for now.
81 public List<String> recipeTargets(final String recipe) {
82 return new ArrayList<>();
86 * {@inheritDoc}. Note: This is a placeholder for now.
89 public List<String> recipePayloads(final String recipe) {
90 return new ArrayList<>();
94 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
95 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
97 * @param onset the event that is reporting the alert for policy to perform an action.
98 * @param operation the control loop operation specifying the actor, operation, target, etc.
99 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
100 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
101 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
103 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
104 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
106 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
107 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
108 Map<String, String> payload = policy.getPayload();
109 if (!validateCdsMandatoryParams(policy)) {
110 return Optional.empty();
112 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
113 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
115 // Retain only the payload by removing CBA name and version once they are extracted
116 // to be put in CDS request header.
117 payload.remove(CdsActorConstants.KEY_CBA_NAME);
118 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
120 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
121 String cbaActionName = policy.getRecipe();
122 CdsActionRequest request = new CdsActionRequest();
123 request.setPolicyPayload(payload);
124 request.setActionName(cbaActionName);
125 request.setResolutionKey(UUID.randomUUID().toString());
127 // Inject AAI properties into payload map. Offer flexibility to the usecase
128 // implementation to inject whatever AAI parameters are of interest to them.
129 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
130 request.setAaiProperties(aaiParams);
132 Builder struct = Struct.newBuilder();
134 String requestStr = request.toString();
135 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
136 + "config-deploy-request from payload parameters: {}", payload);
137 JsonFormat.parser().merge(requestStr, struct);
138 } catch (InvalidProtocolBufferException e) {
139 LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
141 return Optional.empty();
144 // Build CDS gRPC request common-header
145 CommonHeader commonHeader = CommonHeader.newBuilder()
146 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
147 .setRequestId(onset.getRequestId().toString())
148 .setSubRequestId(operation.getSubRequestId())
151 // Build CDS gRPC request action-identifier
152 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
153 .setBlueprintName(cbaName)
154 .setBlueprintVersion(cbaVersion)
155 .setActionName(cbaActionName)
156 .setMode(CdsActorConstants.CDS_MODE)
159 // Finally build the ExecutionServiceInput gRPC request object.
160 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
161 .setCommonHeader(commonHeader)
162 .setActionIdentifiers(actionIdentifiers)
163 .setPayload(struct.build())
165 return Optional.of(executionServiceInput);
168 private boolean validateCdsMandatoryParams(Policy policy) {
169 if (policy == null || policy.getPayload() == null) {
172 Map<String, String> payload = policy.getPayload();
173 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
174 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
175 String cbaActionName = policy.getRecipe();
176 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
177 .isNullOrEmpty(cbaActionName);
180 public class CdsActorServiceManager implements CdsProcessorListener {
182 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
188 public void onMessage(final ExecutionServiceOutput message) {
189 LOGGER.info("Received notification from CDS: {}", message);
190 EventType eventType = message.getStatus().getEventType();
192 case EVENT_COMPONENT_FAILURE:
193 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
195 case EVENT_COMPONENT_PROCESSING:
196 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
198 case EVENT_COMPONENT_EXECUTED:
199 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
202 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
211 public void onError(final Throwable throwable) {
212 Status status = Status.fromThrowable(throwable);
213 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
214 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
218 * Send gRPC request to CDS to execute the blueprint.
220 * @param cdsClient CDS grpc client object.
221 * @param cdsProps CDS properties.
222 * @param executionServiceInput a valid CDS grpc request object.
223 * @return the cds response.
225 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
226 ExecutionServiceInput executionServiceInput) {
228 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
229 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
230 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
231 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
233 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
235 LOGGER.info("CDS status response {}", getCdsStatus());
236 } catch (InterruptedException ex) {
237 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
238 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
239 Thread.currentThread().interrupt();
241 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
243 CdsResponse response = new CdsResponse();
244 response.setRequestId(
245 executionServiceInput != null && executionServiceInput.getCommonHeader() != null
246 ? executionServiceInput.getCommonHeader().getRequestId() : null);
247 response.setStatus(this.getCdsStatus());
251 String getCdsStatus() {
252 return cdsStatus.get();