Add actor for CDS
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActorServiceProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada. All rights reserved.
4  * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.controlloop.actor.cds;
21
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
52 import org.onap.policy.controlloop.policy.Policy;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto
58  * release.
59  */
60 public class CdsActorServiceProvider extends ActorImpl {
61
62     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
63
64     /**
65      * Constructs the object.
66      */
67     public CdsActorServiceProvider() {
68         super(CdsActorConstants.CDS_ACTOR);
69
70         addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
71     }
72
73     // TODO old code: remove lines down to **HERE**
74
75     /**
76      * {@inheritDoc}.
77      */
78     @Override
79     public String actor() {
80         return CdsActorConstants.CDS_ACTOR;
81     }
82
83     /**
84      * {@inheritDoc}. Note: This is a placeholder for now.
85      */
86     @Override
87     public List<String> recipes() {
88         return new ArrayList<>();
89     }
90
91     /**
92      * {@inheritDoc}. Note: This is a placeholder for now.
93      */
94     @Override
95     public List<String> recipeTargets(final String recipe) {
96         return new ArrayList<>();
97     }
98
99     /**
100      * {@inheritDoc}. Note: This is a placeholder for now.
101      */
102     @Override
103     public List<String> recipePayloads(final String recipe) {
104         return new ArrayList<>();
105     }
106
107     /**
108      * Build the CDS ExecutionServiceInput request from the policy object and the AAI
109      * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
110      * item for Frankfurt release.
111      *
112      * @param onset the event that is reporting the alert for policy to perform an action.
113      * @param operation the control loop operation specifying the actor, operation,
114      *        target, etc.
115      * @param policy the policy specified from the yaml generated by CLAMP or through
116      *        Policy API.
117      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
118      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
119      *         object is returned.
120      */
121     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
122                     ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
123
124         // For the current operational TOSCA policy model (yaml) CBA name and version are
125         // embedded in the payload
126         // section, with the new policy type model being proposed in Frankfurt we will be
127         // able to move it out.
128         Map<String, String> payload = policy.getPayload();
129         if (!validateCdsMandatoryParams(policy)) {
130             return Optional.empty();
131         }
132         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
133         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
134
135         // Retain only the payload by removing CBA name and version once they are
136         // extracted
137         // to be put in CDS request header.
138         payload.remove(CdsActorConstants.KEY_CBA_NAME);
139         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
140
141         // Embed payload from policy to ConfigDeployRequest object, serialize and inject
142         // into grpc request.
143         String cbaActionName = policy.getRecipe();
144         CdsActionRequest request = new CdsActionRequest();
145         request.setPolicyPayload(payload);
146         request.setActionName(cbaActionName);
147         request.setResolutionKey(UUID.randomUUID().toString());
148
149         // Inject AAI properties into payload map. Offer flexibility to the usecase
150         // implementation to inject whatever AAI parameters are of interest to them.
151         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
152         // needed by CDS.
153         request.setAaiProperties(aaiParams);
154
155         // Inject any additional event parameters that may be present in the onset event
156         if (onset.getAdditionalEventParams() != null) {
157             request.setAdditionalEventParams(onset.getAdditionalEventParams());
158         }
159
160         Builder struct = Struct.newBuilder();
161         try {
162             String requestStr = request.generateCdsPayload();
163             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
164                             "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
165             JsonFormat.parser().merge(requestStr, struct);
166         } catch (InvalidProtocolBufferException | CoderException e) {
167             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
168                             cbaName, cbaVersion, cbaActionName, e);
169             return Optional.empty();
170         }
171
172         // Build CDS gRPC request common-header
173         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
174                         .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
175                         .build();
176
177         // Build CDS gRPC request action-identifier
178         ActionIdentifiers actionIdentifiers =
179                         ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
180                                         .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
181
182         // Finally build the ExecutionServiceInput gRPC request object.
183         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
184                         .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
185         return Optional.of(executionServiceInput);
186     }
187
188     private boolean validateCdsMandatoryParams(Policy policy) {
189         if (policy == null || policy.getPayload() == null) {
190             return false;
191         }
192         Map<String, String> payload = policy.getPayload();
193         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
194         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
195         String cbaActionName = policy.getRecipe();
196         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
197                         && !Strings.isNullOrEmpty(cbaActionName);
198     }
199
200     public class CdsActorServiceManager implements CdsProcessorListener {
201
202         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
203
204         /**
205          * {@inheritDoc}.
206          */
207         @Override
208         public void onMessage(final ExecutionServiceOutput message) {
209             LOGGER.info("Received notification from CDS: {}", message);
210             EventType eventType = message.getStatus().getEventType();
211             switch (eventType) {
212                 case EVENT_COMPONENT_FAILURE:
213                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
214                     break;
215                 case EVENT_COMPONENT_PROCESSING:
216                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
217                     break;
218                 case EVENT_COMPONENT_EXECUTED:
219                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
220                     break;
221                 default:
222                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
223                     break;
224             }
225         }
226
227         /**
228          * {@inheritDoc}.
229          */
230         @Override
231         public void onError(final Throwable throwable) {
232             Status status = Status.fromThrowable(throwable);
233             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
234             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
235         }
236
237         /**
238          * Send gRPC request to CDS to execute the blueprint.
239          *
240          * @param cdsClient CDS grpc client object.
241          * @param cdsProps CDS properties.
242          * @param executionServiceInput a valid CDS grpc request object.
243          * @return the cds response.
244          */
245         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
246                         ExecutionServiceInput executionServiceInput) {
247             try {
248                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
249                 // TO-DO: Handle requests asynchronously once the callback support is
250                 // added to actors.
251                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
252                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
253                 if (!status) {
254                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
255                 }
256                 LOGGER.info("CDS status response {}", getCdsStatus());
257             } catch (InterruptedException ex) {
258                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
259                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
260                 Thread.currentThread().interrupt();
261             }
262             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
263
264             CdsResponse response = new CdsResponse();
265             response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
266                             ? executionServiceInput.getCommonHeader().getRequestId()
267                             : null);
268             response.setStatus(this.getCdsStatus());
269             return response;
270         }
271
272         String getCdsStatus() {
273             return cdsStatus.get();
274         }
275     }
276
277     // **HERE**
278 }