2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.controlloop.actor.cds;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.CdsResponse;
42 import org.onap.policy.cds.api.CdsProcessorListener;
43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
44 import org.onap.policy.cds.properties.CdsServerProperties;
45 import org.onap.policy.common.utils.coder.CoderException;
46 import org.onap.policy.controlloop.ControlLoopOperation;
47 import org.onap.policy.controlloop.VirtualControlLoopEvent;
48 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
49 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
50 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
51 import org.onap.policy.controlloop.policy.Policy;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
58 public class CdsActorServiceProvider implements Actor {
60 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
66 public String actor() {
67 return CdsActorConstants.CDS_ACTOR;
71 * {@inheritDoc}. Note: This is a placeholder for now.
74 public List<String> recipes() {
75 return new ArrayList<>();
79 * {@inheritDoc}. Note: This is a placeholder for now.
82 public List<String> recipeTargets(final String recipe) {
83 return new ArrayList<>();
87 * {@inheritDoc}. Note: This is a placeholder for now.
90 public List<String> recipePayloads(final String recipe) {
91 return new ArrayList<>();
95 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
96 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
98 * @param onset the event that is reporting the alert for policy to perform an action.
99 * @param operation the control loop operation specifying the actor, operation, target, etc.
100 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
101 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
102 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
104 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
105 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
107 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
108 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
109 Map<String, String> payload = policy.getPayload();
110 if (!validateCdsMandatoryParams(policy)) {
111 return Optional.empty();
113 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
114 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
116 // Retain only the payload by removing CBA name and version once they are extracted
117 // to be put in CDS request header.
118 payload.remove(CdsActorConstants.KEY_CBA_NAME);
119 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
121 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
122 String cbaActionName = policy.getRecipe();
123 CdsActionRequest request = new CdsActionRequest();
124 request.setPolicyPayload(payload);
125 request.setActionName(cbaActionName);
126 request.setResolutionKey(UUID.randomUUID().toString());
128 // Inject AAI properties into payload map. Offer flexibility to the usecase
129 // implementation to inject whatever AAI parameters are of interest to them.
130 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
131 request.setAaiProperties(aaiParams);
133 // Inject any additional event parameters that may be present in the onset event
134 if (onset.getAdditionalEventParams() != null) {
135 request.setAdditionalEventParams(onset.getAdditionalEventParams());
138 Builder struct = Struct.newBuilder();
140 String requestStr = request.generateCdsPayload();
141 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
142 + "config-deploy-request from payload parameters: {}", payload);
143 JsonFormat.parser().merge(requestStr, struct);
144 } catch (InvalidProtocolBufferException | CoderException e) {
145 LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
146 cbaName, cbaVersion, cbaActionName, e);
147 return Optional.empty();
150 // Build CDS gRPC request common-header
151 CommonHeader commonHeader = CommonHeader.newBuilder()
152 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
153 .setRequestId(onset.getRequestId().toString())
154 .setSubRequestId(operation.getSubRequestId())
157 // Build CDS gRPC request action-identifier
158 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
159 .setBlueprintName(cbaName)
160 .setBlueprintVersion(cbaVersion)
161 .setActionName(cbaActionName)
162 .setMode(CdsActorConstants.CDS_MODE)
165 // Finally build the ExecutionServiceInput gRPC request object.
166 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
167 .setCommonHeader(commonHeader)
168 .setActionIdentifiers(actionIdentifiers)
169 .setPayload(struct.build())
171 return Optional.of(executionServiceInput);
174 private boolean validateCdsMandatoryParams(Policy policy) {
175 if (policy == null || policy.getPayload() == null) {
178 Map<String, String> payload = policy.getPayload();
179 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
180 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
181 String cbaActionName = policy.getRecipe();
182 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
183 .isNullOrEmpty(cbaActionName);
186 public class CdsActorServiceManager implements CdsProcessorListener {
188 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
194 public void onMessage(final ExecutionServiceOutput message) {
195 LOGGER.info("Received notification from CDS: {}", message);
196 EventType eventType = message.getStatus().getEventType();
198 case EVENT_COMPONENT_FAILURE:
199 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
201 case EVENT_COMPONENT_PROCESSING:
202 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
204 case EVENT_COMPONENT_EXECUTED:
205 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
208 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
217 public void onError(final Throwable throwable) {
218 Status status = Status.fromThrowable(throwable);
219 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
220 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
224 * Send gRPC request to CDS to execute the blueprint.
226 * @param cdsClient CDS grpc client object.
227 * @param cdsProps CDS properties.
228 * @param executionServiceInput a valid CDS grpc request object.
229 * @return the cds response.
231 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
232 ExecutionServiceInput executionServiceInput) {
234 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
235 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
236 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
237 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
239 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
241 LOGGER.info("CDS status response {}", getCdsStatus());
242 } catch (InterruptedException ex) {
243 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
244 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
245 Thread.currentThread().interrupt();
247 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
249 CdsResponse response = new CdsResponse();
250 response.setRequestId(
251 executionServiceInput != null && executionServiceInput.getCommonHeader() != null
252 ? executionServiceInput.getCommonHeader().getRequestId() : null);
253 response.setStatus(this.getCdsStatus());
257 String getCdsStatus() {
258 return cdsStatus.get();