2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019 Bell Canada. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.controlloop.actor.cds;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
31 import java.util.Optional;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
40 import org.onap.policy.cds.api.CdsProcessorListener;
41 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
42 import org.onap.policy.cds.properties.CdsServerProperties;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.VirtualControlLoopEvent;
45 import org.onap.policy.controlloop.actor.cds.beans.ConfigDeployRequest;
46 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
47 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
53 * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
55 public class CdsActorServiceProvider implements Actor {
57 private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
63 public String actor() {
64 return CdsActorConstants.CDS_ACTOR;
68 * {@inheritDoc}. Note: This is a placeholder for now.
71 public List<String> recipes() {
72 return new ArrayList<>();
76 * {@inheritDoc}. Note: This is a placeholder for now.
79 public List<String> recipeTargets(final String recipe) {
80 return new ArrayList<>();
84 * {@inheritDoc}. Note: This is a placeholder for now.
87 public List<String> recipePayloads(final String recipe) {
88 return new ArrayList<>();
92 * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
93 * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
95 * @param onset the event that is reporting the alert for policy to perform an action.
96 * @param operation the control loop operation specifying the actor, operation, target, etc.
97 * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
98 * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
99 * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
101 public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
102 ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
104 // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
105 // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
106 Map<String, String> payload = policy.getPayload();
107 if (!validateCdsMandatoryParams(policy)) {
108 return Optional.empty();
110 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
111 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
112 String cbaActionName = policy.getRecipe();
114 // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
115 ConfigDeployRequest request = new ConfigDeployRequest();
116 request.setConfigDeployProperties(payload);
118 // Inject AAI properties into payload map. Offer flexibility to the usecase
119 // implementation to inject whatever AAI parameters are of interest to them.
120 // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
121 request.setAaiProperties(aaiParams);
123 Builder struct = Struct.newBuilder();
125 String requestStr = request.toString();
126 Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
127 + "config-deploy-request from payload parameters: {}", payload);
128 JsonFormat.parser().merge(requestStr, struct);
129 } catch (InvalidProtocolBufferException e) {
130 LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
132 return Optional.empty();
135 // Build CDS gRPC request common-header
136 CommonHeader commonHeader = CommonHeader.newBuilder()
137 .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
138 .setRequestId(onset.getRequestId().toString())
139 .setSubRequestId(operation.getSubRequestId())
142 // Build CDS gRPC request action-identifier
143 ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
144 .setBlueprintName(cbaName)
145 .setBlueprintVersion(cbaVersion)
146 .setActionName(cbaActionName)
147 .setMode(CdsActorConstants.CDS_MODE)
150 // Finally build the ExecutionServiceInput gRPC request object.
151 ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
152 .setCommonHeader(commonHeader)
153 .setActionIdentifiers(actionIdentifiers)
154 .setPayload(struct.build())
156 return Optional.of(executionServiceInput);
159 private boolean validateCdsMandatoryParams(Policy policy) {
160 if (policy == null || policy.getPayload() == null) {
163 Map<String, String> payload = policy.getPayload();
164 String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
165 String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
166 String cbaActionName = policy.getRecipe();
167 return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
168 .isNullOrEmpty(cbaActionName);
171 class CdsActorServiceManager implements CdsProcessorListener {
173 private final AtomicReference<String> cdsResponse = new AtomicReference<>();
179 public void onMessage(final ExecutionServiceOutput message) {
180 LOGGER.info("Received notification from CDS: {}", message);
181 EventType eventType = message.getStatus().getEventType();
183 case EVENT_COMPONENT_FAILURE:
184 cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
186 case EVENT_COMPONENT_PROCESSING:
187 cdsResponse.compareAndSet(null, CdsActorConstants.PROCESSING);
189 case EVENT_COMPONENT_EXECUTED:
190 cdsResponse.compareAndSet(null, CdsActorConstants.SUCCESS);
193 cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
202 public void onError(final Throwable throwable) {
203 Status status = Status.fromThrowable(throwable);
204 cdsResponse.compareAndSet(null, CdsActorConstants.ERROR);
205 LOGGER.error("Failed processing blueprint {} {}", status, throwable);
209 * Send gRPC request to CDS to execute the blueprint.
211 * @param cdsClient CDS grpc client object.
212 * @param cdsProps CDS properties.
213 * @param executionServiceInput a valid CDS grpc request object.
214 * @return Status of the CDS request, null if timeout happens or onError is invoked for any reason.
216 public String sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
217 ExecutionServiceInput executionServiceInput) {
219 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
220 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
221 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
222 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
224 cdsResponse.compareAndSet(null, CdsActorConstants.TIMED_OUT);
226 LOGGER.info("CDS response {}", getCdsResponse());
227 } catch (InterruptedException ex) {
228 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
229 cdsResponse.compareAndSet(null, CdsActorConstants.INTERRUPTED);
230 Thread.currentThread().interrupt();
232 LOGGER.info("Status of the CDS gRPC request is: {}", getCdsResponse());
233 return getCdsResponse();
236 String getCdsResponse() {
237 return cdsResponse.get();