Changes in model to integrate cds actor
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActorServiceProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.controlloop.actor.cds;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.CdsResponse;
42 import org.onap.policy.cds.api.CdsProcessorListener;
43 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
44 import org.onap.policy.cds.properties.CdsServerProperties;
45 import org.onap.policy.controlloop.ControlLoopOperation;
46 import org.onap.policy.controlloop.VirtualControlLoopEvent;
47 import org.onap.policy.controlloop.actor.cds.beans.CdsActionRequest;
48 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
49 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
50 import org.onap.policy.controlloop.policy.Policy;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
56  */
57 public class CdsActorServiceProvider implements Actor {
58
59     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
60
61     /**
62      * {@inheritDoc}.
63      */
64     @Override
65     public String actor() {
66         return CdsActorConstants.CDS_ACTOR;
67     }
68
69     /**
70      * {@inheritDoc}. Note: This is a placeholder for now.
71      */
72     @Override
73     public List<String> recipes() {
74         return new ArrayList<>();
75     }
76
77     /**
78      * {@inheritDoc}. Note: This is a placeholder for now.
79      */
80     @Override
81     public List<String> recipeTargets(final String recipe) {
82         return new ArrayList<>();
83     }
84
85     /**
86      * {@inheritDoc}. Note: This is a placeholder for now.
87      */
88     @Override
89     public List<String> recipePayloads(final String recipe) {
90         return new ArrayList<>();
91     }
92
93     /**
94      * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
95      * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
96      *
97      * @param onset the event that is reporting the alert for policy to perform an action.
98      * @param operation the control loop operation specifying the actor, operation, target, etc.
99      * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
100      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
101      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
102      */
103     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
104         ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
105
106         // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
107         // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
108         Map<String, String> payload = policy.getPayload();
109         if (!validateCdsMandatoryParams(policy)) {
110             return Optional.empty();
111         }
112         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
113         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
114         String cbaActionName = policy.getRecipe();
115
116         // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
117         CdsActionRequest request = new CdsActionRequest();
118         request.setConfigDeployProperties(payload);
119         request.setActionName(cbaActionName);
120         request.setResolutionKey(UUID.randomUUID().toString());
121
122         // Inject AAI properties into payload map. Offer flexibility to the usecase
123         // implementation to inject whatever AAI parameters are of interest to them.
124         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
125         request.setAaiProperties(aaiParams);
126
127         Builder struct = Struct.newBuilder();
128         try {
129             String requestStr = request.toString();
130             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
131                 + "config-deploy-request from payload parameters: {}", payload);
132             JsonFormat.parser().merge(requestStr, struct);
133         } catch (InvalidProtocolBufferException e) {
134             LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
135                 cbaActionName, e);
136             return Optional.empty();
137         }
138
139         // Build CDS gRPC request common-header
140         CommonHeader commonHeader = CommonHeader.newBuilder()
141             .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
142             .setRequestId(onset.getRequestId().toString())
143             .setSubRequestId(operation.getSubRequestId())
144             .build();
145
146         // Build CDS gRPC request action-identifier
147         ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
148             .setBlueprintName(cbaName)
149             .setBlueprintVersion(cbaVersion)
150             .setActionName(cbaActionName)
151             .setMode(CdsActorConstants.CDS_MODE)
152             .build();
153
154         // Finally build the ExecutionServiceInput gRPC request object.
155         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
156             .setCommonHeader(commonHeader)
157             .setActionIdentifiers(actionIdentifiers)
158             .setPayload(struct.build())
159             .build();
160         return Optional.of(executionServiceInput);
161     }
162
163     private boolean validateCdsMandatoryParams(Policy policy) {
164         if (policy == null || policy.getPayload() == null) {
165             return false;
166         }
167         Map<String, String> payload = policy.getPayload();
168         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
169         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
170         String cbaActionName = policy.getRecipe();
171         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
172             .isNullOrEmpty(cbaActionName);
173     }
174
175     public class CdsActorServiceManager implements CdsProcessorListener {
176
177         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
178
179         /**
180          * {@inheritDoc}.
181          */
182         @Override
183         public void onMessage(final ExecutionServiceOutput message) {
184             LOGGER.info("Received notification from CDS: {}", message);
185             EventType eventType = message.getStatus().getEventType();
186             switch (eventType) {
187                 case EVENT_COMPONENT_FAILURE:
188                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
189                     break;
190                 case EVENT_COMPONENT_PROCESSING:
191                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
192                     break;
193                 case EVENT_COMPONENT_EXECUTED:
194                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
195                     break;
196                 default:
197                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
198                     break;
199             }
200         }
201
202         /**
203          * {@inheritDoc}.
204          */
205         @Override
206         public void onError(final Throwable throwable) {
207             Status status = Status.fromThrowable(throwable);
208             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
209             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
210         }
211
212         /**
213          * Send gRPC request to CDS to execute the blueprint.
214          *
215          * @param cdsClient CDS grpc client object.
216          * @param cdsProps CDS properties.
217          * @param executionServiceInput a valid CDS grpc request object.
218          * @return the cds response.
219          */
220         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
221                                             ExecutionServiceInput executionServiceInput) {
222             try {
223                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
224                 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
225                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
226                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
227                 if (!status) {
228                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
229                 }
230                 LOGGER.info("CDS status response {}", getCdsStatus());
231             } catch (InterruptedException ex) {
232                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
233                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
234                 Thread.currentThread().interrupt();
235             }
236             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
237
238             CdsResponse response = new CdsResponse();
239             response.setRequestId(
240                     executionServiceInput != null && executionServiceInput.getCommonHeader() != null
241                         ? executionServiceInput.getCommonHeader().getRequestId() : null);
242             response.setStatus(this.getCdsStatus());
243             return response;
244         }
245
246         String getCdsStatus() {
247             return cdsStatus.get();
248         }
249     }
250 }