2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.controlloop.actor.cds;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
52 import org.onap.policy.controlloop.policy.Policy;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto
60 public class CdsActorServiceProvider extends ActorImpl {
62 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
65 * Constructs the object.
67 public CdsActorServiceProvider() {
68 super(CdsActorConstants.CDS_ACTOR);
70 addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
73 // TODO old code: remove lines down to **HERE**
79 public String actor() {
80 return CdsActorConstants.CDS_ACTOR;
84 * {@inheritDoc}. Note: This is a placeholder for now.
87 public List<String> recipes() {
88 return new ArrayList<>();
92 * {@inheritDoc}. Note: This is a placeholder for now.
95 public List<String> recipeTargets(final String recipe) {
96 return new ArrayList<>();
100 * {@inheritDoc}. Note: This is a placeholder for now.
103 public List<String> recipePayloads(final String recipe) {
104 return new ArrayList<>();
108 * Build the CDS ExecutionServiceInput request from the policy object and the AAI
109 * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
110 * item for Frankfurt release.
112 * @param onset the event that is reporting the alert for policy to perform an action.
113 * @param operation the control loop operation specifying the actor, operation,
115 * @param policy the policy specified from the yaml generated by CLAMP or through
117 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
118 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
119 * object is returned.
121 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
122 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
124 // For the current operational TOSCA policy model (yaml) CBA name and version are
125 // embedded in the payload
126 // section, with the new policy type model being proposed in Frankfurt we will be
127 // able to move it out.
128 Map<String, String> payload = policy.getPayload();
129 if (!validateCdsMandatoryParams(policy)) {
130 return Optional.empty();
132 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
133 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
135 // Retain only the payload by removing CBA name and version once they are
137 // to be put in CDS request header.
138 payload.remove(CdsActorConstants.KEY_CBA_NAME);
139 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
141 // Embed payload from policy to ConfigDeployRequest object, serialize and inject
142 // into grpc request.
143 String cbaActionName = policy.getRecipe();
144 CdsActionRequest request = new CdsActionRequest();
145 request.setPolicyPayload(payload);
146 request.setActionName(cbaActionName);
147 request.setResolutionKey(UUID.randomUUID().toString());
149 // Inject AAI properties into payload map. Offer flexibility to the usecase
150 // implementation to inject whatever AAI parameters are of interest to them.
151 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
153 request.setAaiProperties(aaiParams);
155 // Inject any additional event parameters that may be present in the onset event
156 if (onset.getAdditionalEventParams() != null) {
157 request.setAdditionalEventParams(onset.getAdditionalEventParams());
160 Builder struct = Struct.newBuilder();
162 String requestStr = request.generateCdsPayload();
163 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
164 "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
165 JsonFormat.parser().merge(requestStr, struct);
166 } catch (InvalidProtocolBufferException | CoderException e) {
167 LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
168 cbaName, cbaVersion, cbaActionName, e);
169 return Optional.empty();
172 // Build CDS gRPC request common-header
173 CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
174 .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
177 // Build CDS gRPC request action-identifier
178 ActionIdentifiers actionIdentifiers =
179 ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
180 .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
182 // Finally build the ExecutionServiceInput gRPC request object.
183 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
184 .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
185 return Optional.of(executionServiceInput);
188 private boolean validateCdsMandatoryParams(Policy policy) {
189 if (policy == null || policy.getPayload() == null) {
192 Map<String, String> payload = policy.getPayload();
193 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
194 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
195 String cbaActionName = policy.getRecipe();
196 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
197 && !Strings.isNullOrEmpty(cbaActionName);
200 public class CdsActorServiceManager implements CdsProcessorListener {
202 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
208 public void onMessage(final ExecutionServiceOutput message) {
209 LOGGER.info("Received notification from CDS: {}", message);
210 EventType eventType = message.getStatus().getEventType();
212 case EVENT_COMPONENT_FAILURE:
213 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
215 case EVENT_COMPONENT_PROCESSING:
216 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
218 case EVENT_COMPONENT_EXECUTED:
219 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
222 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
231 public void onError(final Throwable throwable) {
232 Status status = Status.fromThrowable(throwable);
233 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
234 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
238 * Send gRPC request to CDS to execute the blueprint.
240 * @param cdsClient CDS grpc client object.
241 * @param cdsProps CDS properties.
242 * @param executionServiceInput a valid CDS grpc request object.
243 * @return the cds response.
245 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
246 ExecutionServiceInput executionServiceInput) {
248 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
249 // TO-DO: Handle requests asynchronously once the callback support is
251 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
252 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
254 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
256 LOGGER.info("CDS status response {}", getCdsStatus());
257 } catch (InterruptedException ex) {
258 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
259 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
260 Thread.currentThread().interrupt();
262 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
264 CdsResponse response = new CdsResponse();
265 response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
266 ? executionServiceInput.getCommonHeader().getRequestId()
268 response.setStatus(this.getCdsStatus());
272 String getCdsStatus() {
273 return cdsStatus.get();