d630d18aee55c586aa639471b5f14af7d3ea4016
[policy/models.git] / models-interactions / model-actors / actor.cds / src / main / java / org / onap / policy / controlloop / actor / cds / CdsActor.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019-2020 Bell Canada. All rights reserved.
4  * Modifications Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * ============LICENSE_END=========================================================
18  */
19
20 package org.onap.policy.controlloop.actor.cds;
21
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import com.google.protobuf.InvalidProtocolBufferException;
25 import com.google.protobuf.Struct;
26 import com.google.protobuf.Struct.Builder;
27 import com.google.protobuf.util.JsonFormat;
28 import io.grpc.Status;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.UUID;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicReference;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.policy.cds.CdsResponse;
43 import org.onap.policy.cds.api.CdsProcessorListener;
44 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
45 import org.onap.policy.cds.properties.CdsServerProperties;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.controlloop.ControlLoopOperation;
48 import org.onap.policy.controlloop.VirtualControlLoopEvent;
49 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
50 import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
51 import org.onap.policy.controlloop.actorserviceprovider.Operator;
52 import org.onap.policy.controlloop.actorserviceprovider.impl.ActorImpl;
53 import org.onap.policy.controlloop.policy.Policy;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * CDS is an unusual actor in that it uses a single, generic operator to initiate all
59  * operation types. The action taken is always the same, only the operation name changes.
60  */
61 public class CdsActor extends ActorImpl {
62     public static final String NAME = CdsActorConstants.CDS_ACTOR;
63
64     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActor.class);
65
66     /**
67      * Constructs the object.
68      */
69     public CdsActor() {
70         super(CdsActorConstants.CDS_ACTOR);
71
72         addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
73     }
74
75     @Override
76     public Operator getOperator(String name) {
77         /*
78          * All operations are managed by the same operator, regardless of the name.
79          */
80         return super.getOperator(GrpcOperation.NAME);
81     }
82
83     // TODO old code: remove lines down to **HERE**
84
85     /**
86      * {@inheritDoc}.
87      */
88     @Override
89     public String actor() {
90         return CdsActorConstants.CDS_ACTOR;
91     }
92
93     /**
94      * {@inheritDoc}. Note: This is a placeholder for now.
95      */
96     @Override
97     public List<String> recipes() {
98         return new ArrayList<>();
99     }
100
101     /**
102      * {@inheritDoc}. Note: This is a placeholder for now.
103      */
104     @Override
105     public List<String> recipeTargets(final String recipe) {
106         return new ArrayList<>();
107     }
108
109     /**
110      * {@inheritDoc}. Note: This is a placeholder for now.
111      */
112     @Override
113     public List<String> recipePayloads(final String recipe) {
114         return new ArrayList<>();
115     }
116
117     /**
118      * Build the CDS ExecutionServiceInput request from the policy object and the AAI
119      * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
120      * item for Frankfurt release.
121      *
122      * @param onset the event that is reporting the alert for policy to perform an action.
123      * @param operation the control loop operation specifying the actor, operation,
124      *        target, etc.
125      * @param policy the policy specified from the yaml generated by CLAMP or through
126      *        Policy API.
127      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
128      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
129      *         object is returned.
130      */
131     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
132                     ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
133
134         // For the current operational TOSCA policy model (yaml) CBA name and version are
135         // embedded in the payload
136         // section, with the new policy type model being proposed in Frankfurt we will be
137         // able to move it out.
138         Map<String, String> payload = policy.getPayload();
139         if (!validateCdsMandatoryParams(policy)) {
140             return Optional.empty();
141         }
142         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
143         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
144
145         // Retain only the payload by removing CBA name and version once they are
146         // extracted
147         // to be put in CDS request header.
148         payload.remove(CdsActorConstants.KEY_CBA_NAME);
149         payload.remove(CdsActorConstants.KEY_CBA_VERSION);
150
151         // Embed payload from policy to ConfigDeployRequest object, serialize and inject
152         // into grpc request.
153         String cbaActionName = policy.getRecipe();
154         CdsActionRequest request = new CdsActionRequest();
155         request.setPolicyPayload(payload);
156         request.setActionName(cbaActionName);
157         request.setResolutionKey(UUID.randomUUID().toString());
158
159         // Inject AAI properties into payload map. Offer flexibility to the usecase
160         // implementation to inject whatever AAI parameters are of interest to them.
161         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
162         // needed by CDS.
163         request.setAaiProperties(aaiParams);
164
165         // Inject any additional event parameters that may be present in the onset event
166         if (onset.getAdditionalEventParams() != null) {
167             request.setAdditionalEventParams(onset.getAdditionalEventParams());
168         }
169
170         Builder struct = Struct.newBuilder();
171         try {
172             String requestStr = request.generateCdsPayload();
173             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
174                             "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
175             JsonFormat.parser().merge(requestStr, struct);
176         } catch (InvalidProtocolBufferException | CoderException e) {
177             LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
178                             cbaName, cbaVersion, cbaActionName, e);
179             return Optional.empty();
180         }
181
182         // Build CDS gRPC request common-header
183         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
184                         .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
185                         .build();
186
187         // Build CDS gRPC request action-identifier
188         ActionIdentifiers actionIdentifiers =
189                         ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
190                                         .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
191
192         // Finally build the ExecutionServiceInput gRPC request object.
193         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
194                         .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
195         return Optional.of(executionServiceInput);
196     }
197
198     private boolean validateCdsMandatoryParams(Policy policy) {
199         if (policy == null || policy.getPayload() == null) {
200             return false;
201         }
202         Map<String, String> payload = policy.getPayload();
203         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
204         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
205         String cbaActionName = policy.getRecipe();
206         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
207                         && !Strings.isNullOrEmpty(cbaActionName);
208     }
209
210     public class CdsActorServiceManager implements CdsProcessorListener {
211
212         private final AtomicReference<String> cdsStatus = new AtomicReference<>();
213
214         /**
215          * {@inheritDoc}.
216          */
217         @Override
218         public void onMessage(final ExecutionServiceOutput message) {
219             LOGGER.info("Received notification from CDS: {}", message);
220             EventType eventType = message.getStatus().getEventType();
221             switch (eventType) {
222                 case EVENT_COMPONENT_FAILURE:
223                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
224                     break;
225                 case EVENT_COMPONENT_PROCESSING:
226                     cdsStatus.compareAndSet(null, CdsActorConstants.PROCESSING);
227                     break;
228                 case EVENT_COMPONENT_EXECUTED:
229                     cdsStatus.compareAndSet(null, CdsActorConstants.SUCCESS);
230                     break;
231                 default:
232                     cdsStatus.compareAndSet(null, CdsActorConstants.FAILED);
233                     break;
234             }
235         }
236
237         /**
238          * {@inheritDoc}.
239          */
240         @Override
241         public void onError(final Throwable throwable) {
242             Status status = Status.fromThrowable(throwable);
243             cdsStatus.compareAndSet(null, CdsActorConstants.ERROR);
244             LOGGER.error("Failed processing blueprint {}", status, throwable);
245         }
246
247         /**
248          * Send gRPC request to CDS to execute the blueprint.
249          *
250          * @param cdsClient CDS grpc client object.
251          * @param cdsProps CDS properties.
252          * @param executionServiceInput a valid CDS grpc request object.
253          * @return the cds response.
254          */
255         public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
256                         ExecutionServiceInput executionServiceInput) {
257             try {
258                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
259                 // TO-DO: Handle requests asynchronously once the callback support is
260                 // added to actors.
261                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
262                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
263                 if (!status) {
264                     cdsStatus.compareAndSet(null, CdsActorConstants.TIMED_OUT);
265                 }
266                 LOGGER.info("CDS status response {}", getCdsStatus());
267             } catch (InterruptedException ex) {
268                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
269                 cdsStatus.compareAndSet(null, CdsActorConstants.INTERRUPTED);
270                 Thread.currentThread().interrupt();
271             }
272             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
273
274             CdsResponse response = new CdsResponse();
275             response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
276                             ? executionServiceInput.getCommonHeader().getRequestId()
277                             : null);
278             response.setStatus(this.getCdsStatus());
279             return response;
280         }
281
282         String getCdsStatus() {
283             return cdsStatus.get();
284         }
285     }
286
287     // **HERE**
288 }