1ad184e497218c06424891ebcdc6b5e024dc4d17
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActorServiceProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.controlloop.actor.cds;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.UUID;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
41 import org.onap.policy.cds.api.CdsProcessorListener;
42 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
43 import org.onap.policy.cds.properties.CdsServerProperties;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.VirtualControlLoopEvent;
46 import org.onap.policy.controlloop.actor.cds.beans.CdsActionRequest;
47 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
48 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
49 import org.onap.policy.controlloop.policy.Policy;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
55  */
56 public class CdsActorServiceProvider implements Actor {
57
58     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
59
60     /**
61      * {@inheritDoc}.
62      */
63     @Override
64     public String actor() {
65         return CdsActorConstants.CDS_ACTOR;
66     }
67
68     /**
69      * {@inheritDoc}. Note: This is a placeholder for now.
70      */
71     @Override
72     public List<String> recipes() {
73         return new ArrayList<>();
74     }
75
76     /**
77      * {@inheritDoc}. Note: This is a placeholder for now.
78      */
79     @Override
80     public List<String> recipeTargets(final String recipe) {
81         return new ArrayList<>();
82     }
83
84     /**
85      * {@inheritDoc}. Note: This is a placeholder for now.
86      */
87     @Override
88     public List<String> recipePayloads(final String recipe) {
89         return new ArrayList<>();
90     }
91
92     /**
93      * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
94      * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
95      *
96      * @param onset the event that is reporting the alert for policy to perform an action.
97      * @param operation the control loop operation specifying the actor, operation, target, etc.
98      * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
99      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
100      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
101      */
102     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
103         ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
104
105         // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
106         // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
107         Map<String, String> payload = policy.getPayload();
108         if (!validateCdsMandatoryParams(policy)) {
109             return Optional.empty();
110         }
111         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
112         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
113         String cbaActionName = policy.getRecipe();
114
115         // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
116         CdsActionRequest request = new CdsActionRequest();
117         request.setConfigDeployProperties(payload);
118         request.setActionName(cbaActionName);
119         request.setResolutionKey(UUID.randomUUID().toString());
120
121         // Inject AAI properties into payload map. Offer flexibility to the usecase
122         // implementation to inject whatever AAI parameters are of interest to them.
123         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
124         request.setAaiProperties(aaiParams);
125
126         Builder struct = Struct.newBuilder();
127         try {
128             String requestStr = request.toString();
129             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
130                 + "config-deploy-request from payload parameters: {}", payload);
131             JsonFormat.parser().merge(requestStr, struct);
132         } catch (InvalidProtocolBufferException e) {
133             LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
134                 cbaActionName, e);
135             return Optional.empty();
136         }
137
138         // Build CDS gRPC request common-header
139         CommonHeader commonHeader = CommonHeader.newBuilder()
140             .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
141             .setRequestId(onset.getRequestId().toString())
142             .setSubRequestId(operation.getSubRequestId())
143             .build();
144
145         // Build CDS gRPC request action-identifier
146         ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
147             .setBlueprintName(cbaName)
148             .setBlueprintVersion(cbaVersion)
149             .setActionName(cbaActionName)
150             .setMode(CdsActorConstants.CDS_MODE)
151             .build();
152
153         // Finally build the ExecutionServiceInput gRPC request object.
154         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
155             .setCommonHeader(commonHeader)
156             .setActionIdentifiers(actionIdentifiers)
157             .setPayload(struct.build())
158             .build();
159         return Optional.of(executionServiceInput);
160     }
161
162     private boolean validateCdsMandatoryParams(Policy policy) {
163         if (policy == null || policy.getPayload() == null) {
164             return false;
165         }
166         Map<String, String> payload = policy.getPayload();
167         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
168         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
169         String cbaActionName = policy.getRecipe();
170         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
171             .isNullOrEmpty(cbaActionName);
172     }
173
174     class CdsActorServiceManager implements CdsProcessorListener {
175
176         private final AtomicReference<String> cdsResponse = new AtomicReference<>();
177
178         /**
179          * {@inheritDoc}.
180          */
181         @Override
182         public void onMessage(final ExecutionServiceOutput message) {
183             LOGGER.info("Received notification from CDS: {}", message);
184             EventType eventType = message.getStatus().getEventType();
185             switch (eventType) {
186                 case EVENT_COMPONENT_FAILURE:
187                     cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
188                     break;
189                 case EVENT_COMPONENT_PROCESSING:
190                     cdsResponse.compareAndSet(null, CdsActorConstants.PROCESSING);
191                     break;
192                 case EVENT_COMPONENT_EXECUTED:
193                     cdsResponse.compareAndSet(null, CdsActorConstants.SUCCESS);
194                     break;
195                 default:
196                     cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
197                     break;
198             }
199         }
200
201         /**
202          * {@inheritDoc}.
203          */
204         @Override
205         public void onError(final Throwable throwable) {
206             Status status = Status.fromThrowable(throwable);
207             cdsResponse.compareAndSet(null, CdsActorConstants.ERROR);
208             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
209         }
210
211         /**
212          * Send gRPC request to CDS to execute the blueprint.
213          *
214          * @param cdsClient CDS grpc client object.
215          * @param cdsProps CDS properties.
216          * @param executionServiceInput a valid CDS grpc request object.
217          * @return Status of the CDS request, null if timeout happens or onError is invoked for any reason.
218          */
219         public String sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
220             ExecutionServiceInput executionServiceInput) {
221             try {
222                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
223                 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
224                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
225                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
226                 if (!status) {
227                     cdsResponse.compareAndSet(null, CdsActorConstants.TIMED_OUT);
228                 }
229                 LOGGER.info("CDS response {}", getCdsResponse());
230             } catch (InterruptedException ex) {
231                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
232                 cdsResponse.compareAndSet(null, CdsActorConstants.INTERRUPTED);
233                 Thread.currentThread().interrupt();
234             }
235             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsResponse());
236             return getCdsResponse();
237         }
238
239         String getCdsResponse() {
240             return cdsResponse.get();
241         }
242     }
243 }