2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 Bell Canada. All rights reserved.
4 * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.onap.policy.controlloop.actor.cds;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.Operator;
52 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
53 import org.onap.policy.controlloop.policy.Policy;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * CDS is an unusual actor in that it uses a single, generic operator to initiate all
59 * operation types. The action taken is always the same, only the operation name changes.
61 public class CdsActorServiceProvider extends ActorImpl {
63 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
66 * Constructs the object.
68 public CdsActorServiceProvider() {
69 super(CdsActorConstants.CDS_ACTOR);
71 addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
75 public Operator getOperator(String name) {
77 * All operations are managed by the same operator, regardless of the name.
79 return super.getOperator(GrpcOperation.NAME);
82 // TODO old code: remove lines down to **HERE**
88 public String actor() {
89 return CdsActorConstants.CDS_ACTOR;
93 * {@inheritDoc}. Note: This is a placeholder for now.
96 public List<String> recipes() {
97 return new ArrayList<>();
101 * {@inheritDoc}. Note: This is a placeholder for now.
104 public List<String> recipeTargets(final String recipe) {
105 return new ArrayList<>();
109 * {@inheritDoc}. Note: This is a placeholder for now.
112 public List<String> recipePayloads(final String recipe) {
113 return new ArrayList<>();
117 * Build the CDS ExecutionServiceInput request from the policy object and the AAI
118 * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
119 * item for Frankfurt release.
121 * @param onset the event that is reporting the alert for policy to perform an action.
122 * @param operation the control loop operation specifying the actor, operation,
124 * @param policy the policy specified from the yaml generated by CLAMP or through
126 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
127 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
128 * object is returned.
130 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
131 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
133 // For the current operational TOSCA policy model (yaml) CBA name and version are
134 // embedded in the payload
135 // section, with the new policy type model being proposed in Frankfurt we will be
136 // able to move it out.
137 Map<String, String> payload = policy.getPayload();
138 if (!validateCdsMandatoryParams(policy)) {
139 return Optional.empty();
141 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
142 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
144 // Retain only the payload by removing CBA name and version once they are
146 // to be put in CDS request header.
147 payload.remove(CdsActorConstants.KEY_CBA_NAME);
148 payload.remove(CdsActorConstants.KEY_CBA_VERSION);
150 // Embed payload from policy to ConfigDeployRequest object, serialize and inject
151 // into grpc request.
152 String cbaActionName = policy.getRecipe();
153 CdsActionRequest request = new CdsActionRequest();
154 request.setPolicyPayload(payload);
155 request.setActionName(cbaActionName);
156 request.setResolutionKey(UUID.randomUUID().toString());
158 // Inject AAI properties into payload map. Offer flexibility to the usecase
159 // implementation to inject whatever AAI parameters are of interest to them.
160 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
162 request.setAaiProperties(aaiParams);
164 // Inject any additional event parameters that may be present in the onset event
165 if (onset.getAdditionalEventParams() != null) {
166 request.setAdditionalEventParams(onset.getAdditionalEventParams());
169 Builder struct = Struct.newBuilder();
171 String requestStr = request.generateCdsPayload();
172 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
173 "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
174 JsonFormat.parser().merge(requestStr, struct);
175 } catch (InvalidProtocolBufferException | CoderException e) {
176 LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
177 cbaName, cbaVersion, cbaActionName, e);
178 return Optional.empty();
181 // Build CDS gRPC request common-header
182 CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
183 .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
186 // Build CDS gRPC request action-identifier
187 ActionIdentifiers actionIdentifiers =
188 ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
189 .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
191 // Finally build the ExecutionServiceInput gRPC request object.
192 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
193 .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
194 return Optional.of(executionServiceInput);
197 private boolean validateCdsMandatoryParams(Policy policy) {
198 if (policy == null || policy.getPayload() == null) {
201 Map<String, String> payload = policy.getPayload();
202 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
203 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
204 String cbaActionName = policy.getRecipe();
205 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
206 && !Strings.isNullOrEmpty(cbaActionName);
209 public class CdsActorServiceManager implements CdsProcessorListener {
211 private final AtomicReference<String> cdsStatus = new AtomicReference<>();
217 public void onMessage(final ExecutionServiceOutput message) {
218 LOGGER.info("Received notification from CDS: {}", message);
219 EventType eventType = message.getStatus().getEventType();
221 case EVENT_COMPONENT_FAILURE:
222 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
224 case EVENT_COMPONENT_PROCESSING:
225 cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
227 case EVENT_COMPONENT_EXECUTED:
228 cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
231 cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
240 public void onError(final Throwable throwable) {
241 Status status = Status.fromThrowable(throwable);
242 cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
243 LOGGER.error("Failed processing blueprint {}", status, throwable);
247 * Send gRPC request to CDS to execute the blueprint.
249 * @param cdsClient CDS grpc client object.
250 * @param cdsProps CDS properties.
251 * @param executionServiceInput a valid CDS grpc request object.
252 * @return the cds response.
254 public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
255 ExecutionServiceInput executionServiceInput) {
257 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
258 // TO-DO: Handle requests asynchronously once the callback support is
260 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
261 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
263 cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
265 LOGGER.info("CDS status response {}", getCdsStatus());
266 } catch (InterruptedException ex) {
267 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
268 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
269 Thread.currentThread().interrupt();
271 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
273 CdsResponse response = new CdsResponse();
274 response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
275 ? executionServiceInput.getCommonHeader().getRequestId()
277 response.setStatus(this.getCdsStatus());
281 String getCdsStatus() {
282 return cdsStatus.get();