2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.controlloop.actor.cds;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.api.CdsProcessorListener;
42 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
43 import org.onap.policy.cds.properties.CdsServerProperties;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.VirtualControlLoopEvent;
46 import org.onap.policy.controlloop.actor.cds.beans.CdsActionRequest;
47 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
48 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
49 import org.onap.policy.controlloop.policy.Policy;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
56 public class CdsActorServiceProvider implements Actor {
58 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
64 public String actor() {
65 return CdsActorConstants.CDS_ACTOR;
69 * {@inheritDoc}. Note: This is a placeholder for now.
72 public List<String> recipes() {
73 return new ArrayList<>();
77 * {@inheritDoc}. Note: This is a placeholder for now.
80 public List<String> recipeTargets(final String recipe) {
81 return new ArrayList<>();
85 * {@inheritDoc}. Note: This is a placeholder for now.
88 public List<String> recipePayloads(final String recipe) {
89 return new ArrayList<>();
93 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
94 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
96 * @param onset the event that is reporting the alert for policy to perform an action.
97 * @param operation the control loop operation specifying the actor, operation, target, etc.
98 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
99 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
100 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
102 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
103 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
105 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
106 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
107 Map<String, String> payload = policy.getPayload();
108 if (!validateCdsMandatoryParams(policy)) {
109 return Optional.empty();
111 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
112 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
113 String cbaActionName = policy.getRecipe();
115 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
116 CdsActionRequest request = new CdsActionRequest();
117 request.setConfigDeployProperties(payload);
118 request.setActionName(cbaActionName);
119 request.setResolutionKey(UUID.randomUUID().toString());
121 // Inject AAI properties into payload map. Offer flexibility to the usecase
122 // implementation to inject whatever AAI parameters are of interest to them.
123 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
124 request.setAaiProperties(aaiParams);
126 Builder struct = Struct.newBuilder();
128 String requestStr = request.toString();
129 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
130 + "config-deploy-request from payload parameters: {}", payload);
131 JsonFormat.parser().merge(requestStr, struct);
132 } catch (InvalidProtocolBufferException e) {
133 LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
135 return Optional.empty();
138 // Build CDS gRPC request common-header
139 CommonHeader commonHeader = CommonHeader.newBuilder()
140 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
141 .setRequestId(onset.getRequestId().toString())
142 .setSubRequestId(operation.getSubRequestId())
145 // Build CDS gRPC request action-identifier
146 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
147 .setBlueprintName(cbaName)
148 .setBlueprintVersion(cbaVersion)
149 .setActionName(cbaActionName)
150 .setMode(CdsActorConstants.CDS_MODE)
153 // Finally build the ExecutionServiceInput gRPC request object.
154 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
155 .setCommonHeader(commonHeader)
156 .setActionIdentifiers(actionIdentifiers)
157 .setPayload(struct.build())
159 return Optional.of(executionServiceInput);
162 private boolean validateCdsMandatoryParams(Policy policy) {
163 if (policy == null || policy.getPayload() == null) {
166 Map<String, String> payload = policy.getPayload();
167 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
168 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
169 String cbaActionName = policy.getRecipe();
170 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
171 .isNullOrEmpty(cbaActionName);
174 class CdsActorServiceManager implements CdsProcessorListener {
176 private final AtomicReference<String> cdsResponse = new AtomicReference<>();
182 public void onMessage(final ExecutionServiceOutput message) {
183 LOGGER.info("Received notification from CDS: {}", message);
184 EventType eventType = message.getStatus().getEventType();
186 case EVENT_COMPONENT_FAILURE:
187 cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
189 case EVENT_COMPONENT_PROCESSING:
190 cdsResponse.compareAndSet(null, CdsActorConstants.PROCESSING);
192 case EVENT_COMPONENT_EXECUTED:
193 cdsResponse.compareAndSet(null, CdsActorConstants.SUCCESS);
196 cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
205 public void onError(final Throwable throwable) {
206 Status status = Status.fromThrowable(throwable);
207 cdsResponse.compareAndSet(null, CdsActorConstants.ERROR);
208 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
212 * Send gRPC request to CDS to execute the blueprint.
214 * @param cdsClient CDS grpc client object.
215 * @param cdsProps CDS properties.
216 * @param executionServiceInput a valid CDS grpc request object.
217 * @return Status of the CDS request, null if timeout happens or onError is invoked for any reason.
219 public String sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
220 ExecutionServiceInput executionServiceInput) {
222 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
223 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
224 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
225 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
227 cdsResponse.compareAndSet(null, CdsActorConstants.TIMED_OUT);
229 LOGGER.info("CDS response {}", getCdsResponse());
230 } catch (InterruptedException ex) {
231 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
232 cdsResponse.compareAndSet(null, CdsActorConstants.INTERRUPTED);
233 Thread.currentThread().interrupt();
235 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsResponse());
236 return getCdsResponse();
239 String getCdsResponse() {
240 return cdsResponse.get();