Use "coder" to serialize Actor requests
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActorServiceProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 Bell Canada. All rights reserved.
4  * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.controlloop.actor.cds;
21
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.Operator;
52 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
53 import org.onap.policy.controlloop.policy.Policy;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * CDS is an unusual actor in that it uses a single, generic operator to initiate all
59  * operation types. The action taken is always the same, only the operation name changes.
60  */
61 public class CdsActorServiceProvider extends ActorImpl {
62
63     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
64
65     /**
66      * Constructs the object.
67      */
68     public CdsActorServiceProvider() {
69         super(CdsActorConstants.CDS_ACTOR);
70
71         addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
72     }
73
74     @Override
75     public Operator getOperator(String name) {
76         /*
77          * All operations are managed by the same operator, regardless of the name.
78          */
79         return super.getOperator(GrpcOperation.NAME);
80     }
81
82     // TODO old code: remove lines down to **HERE**
83
84     /**
85      * {@inheritDoc}.
86      */
87     @Override
88     public String actor() {
89         return CdsActorConstants.CDS_ACTOR;
90     }
91
92     /**
93      * {@inheritDoc}. Note: This is a placeholder for now.
94      */
95     @Override
96     public List<String> recipes() {
97         return new ArrayList<>();
98     }
99
100     /**
101      * {@inheritDoc}. Note: This is a placeholder for now.
102      */
103     @Override
104     public List<String> recipeTargets(final String recipe) {
105         return new ArrayList<>();
106     }
107
108     /**
109      * {@inheritDoc}. Note: This is a placeholder for now.
110      */
111     @Override
112     public List<String> recipePayloads(final String recipe) {
113         return new ArrayList<>();
114     }
115
116     /**
117      * Build the CDS ExecutionServiceInput request from the policy object and the AAI
118      * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
119      * item for Frankfurt release.
120      *
121      * @param onset the event that is reporting the alert for policy to perform an action.
122      * @param operation the control loop operation specifying the actor, operation,
123      *        target, etc.
124      * @param policy the policy specified from the yaml generated by CLAMP or through
125      *        Policy API.
126      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
127      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
128      *         object is returned.
129      */
130     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
131                     ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
132
133         // For the current operational TOSCA policy model (yaml) CBA name and version are
134         // embedded in the payload
135         // section, with the new policy type model being proposed in Frankfurt we will be
136         // able to move it out.
137         Map<String, String> payload = policy.getPayload();
138         if (!validateCdsMandatoryParams(policy)) {
139             return Optional.empty();
140         }
141         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
142         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
143
144         // Retain only the payload by removing CBA name and version once they are
145         // extracted
146         // to be put in CDS request header.
147         payload.remove(CdsActorConstants.KEY_CBA_NAME);
148         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
149
150         // Embed payload from policy to ConfigDeployRequest object, serialize and inject
151         // into grpc request.
152         String cbaActionName = policy.getRecipe();
153         CdsActionRequest request = new CdsActionRequest();
154         request.setPolicyPayload(payload);
155         request.setActionName(cbaActionName);
156         request.setResolutionKey(UUID.randomUUID().toString());
157
158         // Inject AAI properties into payload map. Offer flexibility to the usecase
159         // implementation to inject whatever AAI parameters are of interest to them.
160         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
161         // needed by CDS.
162         request.setAaiProperties(aaiParams);
163
164         // Inject any additional event parameters that may be present in the onset event
165         if (onset.getAdditionalEventParams() != null) {
166             request.setAdditionalEventParams(onset.getAdditionalEventParams());
167         }
168
169         Builder struct = Struct.newBuilder();
170         try {
171             String requestStr = request.generateCdsPayload();
172             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
173                             "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
174             JsonFormat.parser().merge(requestStr, struct);
175         } catch (InvalidProtocolBufferException | CoderException e) {
176             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
177                             cbaName, cbaVersion, cbaActionName, e);
178             return Optional.empty();
179         }
180
181         // Build CDS gRPC request common-header
182         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
183                         .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
184                         .build();
185
186         // Build CDS gRPC request action-identifier
187         ActionIdentifiers actionIdentifiers =
188                         ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
189                                         .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
190
191         // Finally build the ExecutionServiceInput gRPC request object.
192         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
193                         .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
194         return Optional.of(executionServiceInput);
195     }
196
197     private boolean validateCdsMandatoryParams(Policy policy) {
198         if (policy == null || policy.getPayload() == null) {
199             return false;
200         }
201         Map<String, String> payload = policy.getPayload();
202         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
203         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
204         String cbaActionName = policy.getRecipe();
205         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
206                         && !Strings.isNullOrEmpty(cbaActionName);
207     }
208
209     public class CdsActorServiceManager implements CdsProcessorListener {
210
211         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
212
213         /**
214          * {@inheritDoc}.
215          */
216         @Override
217         public void onMessage(final ExecutionServiceOutput message) {
218             LOGGER.info("Received notification from CDS: {}", message);
219             EventType eventType = message.getStatus().getEventType();
220             switch (eventType) {
221                 case EVENT_COMPONENT_FAILURE:
222                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
223                     break;
224                 case EVENT_COMPONENT_PROCESSING:
225                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
226                     break;
227                 case EVENT_COMPONENT_EXECUTED:
228                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
229                     break;
230                 default:
231                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
232                     break;
233             }
234         }
235
236         /**
237          * {@inheritDoc}.
238          */
239         @Override
240         public void onError(final Throwable throwable) {
241             Status status = Status.fromThrowable(throwable);
242             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
243             LOGGER.error("Failed processing blueprint {}", status, throwable);
244         }
245
246         /**
247          * Send gRPC request to CDS to execute the blueprint.
248          *
249          * @param cdsClient CDS grpc client object.
250          * @param cdsProps CDS properties.
251          * @param executionServiceInput a valid CDS grpc request object.
252          * @return the cds response.
253          */
254         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
255                         ExecutionServiceInput executionServiceInput) {
256             try {
257                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
258                 // TO-DO: Handle requests asynchronously once the callback support is
259                 // added to actors.
260                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
261                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
262                 if (!status) {
263                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
264                 }
265                 LOGGER.info("CDS status response {}", getCdsStatus());
266             } catch (InterruptedException ex) {
267                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
268                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
269                 Thread.currentThread().interrupt();
270             }
271             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
272
273             CdsResponse response = new CdsResponse();
274             response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
275                             ? executionServiceInput.getCommonHeader().getRequestId()
276                             : null);
277             response.setStatus(this.getCdsStatus());
278             return response;
279         }
280
281         String getCdsStatus() {
282             return cdsStatus.get();
283         }
284     }
285
286     // **HERE**
287 }