05ff02e5cf32c81e4c7d54184ab48381a0e46613
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActorServiceProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada. All rights reserved.
4  * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.controlloop.actor.cds;
21
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
52 import org.onap.policy.controlloop.policy.Policy;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
58  */
59 public class CdsActorServiceProvider extends ActorImpl {
60
61     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
62
63     public CdsActorServiceProvider() {
64         super(CdsActorConstants.CDS_ACTOR);
65     }
66
67     /**
68      * {@inheritDoc}.
69      */
70     @Override
71     public String actor() {
72         return CdsActorConstants.CDS_ACTOR;
73     }
74
75     /**
76      * {@inheritDoc}. Note: This is a placeholder for now.
77      */
78     @Override
79     public List<String> recipes() {
80         return new ArrayList<>();
81     }
82
83     /**
84      * {@inheritDoc}. Note: This is a placeholder for now.
85      */
86     @Override
87     public List<String> recipeTargets(final String recipe) {
88         return new ArrayList<>();
89     }
90
91     /**
92      * {@inheritDoc}. Note: This is a placeholder for now.
93      */
94     @Override
95     public List<String> recipePayloads(final String recipe) {
96         return new ArrayList<>();
97     }
98
99     /**
100      * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
101      * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
102      *
103      * @param onset the event that is reporting the alert for policy to perform an action.
104      * @param operation the control loop operation specifying the actor, operation, target, etc.
105      * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
106      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
107      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
108      */
109     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
110         ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
111
112         // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
113         // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
114         Map<String, String> payload = policy.getPayload();
115         if (!validateCdsMandatoryParams(policy)) {
116             return Optional.empty();
117         }
118         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
119         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
120
121         // Retain only the payload by removing CBA name and version once they are extracted
122         // to be put in CDS request header.
123         payload.remove(CdsActorConstants.KEY_CBA_NAME);
124         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
125
126         // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
127         String cbaActionName = policy.getRecipe();
128         CdsActionRequest request = new CdsActionRequest();
129         request.setPolicyPayload(payload);
130         request.setActionName(cbaActionName);
131         request.setResolutionKey(UUID.randomUUID().toString());
132
133         // Inject AAI properties into payload map. Offer flexibility to the usecase
134         // implementation to inject whatever AAI parameters are of interest to them.
135         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
136         request.setAaiProperties(aaiParams);
137
138         // Inject any additional event parameters that may be present in the onset event
139         if (onset.getAdditionalEventParams() != null) {
140             request.setAdditionalEventParams(onset.getAdditionalEventParams());
141         }
142
143         Builder struct = Struct.newBuilder();
144         try {
145             String requestStr = request.generateCdsPayload();
146             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
147                 + "config-deploy-request from payload parameters: {}", payload);
148             JsonFormat.parser().merge(requestStr, struct);
149         } catch (InvalidProtocolBufferException | CoderException e) {
150             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
151                     cbaName, cbaVersion, cbaActionName, e);
152             return Optional.empty();
153         }
154
155         // Build CDS gRPC request common-header
156         CommonHeader commonHeader = CommonHeader.newBuilder()
157             .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
158             .setRequestId(onset.getRequestId().toString())
159             .setSubRequestId(operation.getSubRequestId())
160             .build();
161
162         // Build CDS gRPC request action-identifier
163         ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
164             .setBlueprintName(cbaName)
165             .setBlueprintVersion(cbaVersion)
166             .setActionName(cbaActionName)
167             .setMode(CdsActorConstants.CDS_MODE)
168             .build();
169
170         // Finally build the ExecutionServiceInput gRPC request object.
171         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
172             .setCommonHeader(commonHeader)
173             .setActionIdentifiers(actionIdentifiers)
174             .setPayload(struct.build())
175             .build();
176         return Optional.of(executionServiceInput);
177     }
178
179     private boolean validateCdsMandatoryParams(Policy policy) {
180         if (policy == null || policy.getPayload() == null) {
181             return false;
182         }
183         Map<String, String> payload = policy.getPayload();
184         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
185         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
186         String cbaActionName = policy.getRecipe();
187         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
188             .isNullOrEmpty(cbaActionName);
189     }
190
191     public class CdsActorServiceManager implements CdsProcessorListener {
192
193         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
194
195         /**
196          * {@inheritDoc}.
197          */
198         @Override
199         public void onMessage(final ExecutionServiceOutput message) {
200             LOGGER.info("Received notification from CDS: {}", message);
201             EventType eventType = message.getStatus().getEventType();
202             switch (eventType) {
203                 case EVENT_COMPONENT_FAILURE:
204                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
205                     break;
206                 case EVENT_COMPONENT_PROCESSING:
207                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
208                     break;
209                 case EVENT_COMPONENT_EXECUTED:
210                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
211                     break;
212                 default:
213                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
214                     break;
215             }
216         }
217
218         /**
219          * {@inheritDoc}.
220          */
221         @Override
222         public void onError(final Throwable throwable) {
223             Status status = Status.fromThrowable(throwable);
224             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
225             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
226         }
227
228         /**
229          * Send gRPC request to CDS to execute the blueprint.
230          *
231          * @param cdsClient CDS grpc client object.
232          * @param cdsProps CDS properties.
233          * @param executionServiceInput a valid CDS grpc request object.
234          * @return the cds response.
235          */
236         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
237                                             ExecutionServiceInput executionServiceInput) {
238             try {
239                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
240                 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
241                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
242                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
243                 if (!status) {
244                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
245                 }
246                 LOGGER.info("CDS status response {}", getCdsStatus());
247             } catch (InterruptedException ex) {
248                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
249                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
250                 Thread.currentThread().interrupt();
251             }
252             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
253
254             CdsResponse response = new CdsResponse();
255             response.setRequestId(
256                     executionServiceInput != null && executionServiceInput.getCommonHeader() != null
257                         ? executionServiceInput.getCommonHeader().getRequestId() : null);
258             response.setStatus(this.getCdsStatus());
259             return response;
260         }
261
262         String getCdsStatus() {
263             return cdsStatus.get();
264         }
265     }
266 }