aaf07ac21be33fd946dcf0dabfd810f89abeaa9e
[policy/models.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2019 Bell Canada. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.controlloop.actor.cds;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import com.google.protobuf.InvalidProtocolBufferException;
24 import com.google.protobuf.Struct;
25 import com.google.protobuf.Struct.Builder;
26 import com.google.protobuf.util.JsonFormat;
27 import io.grpc.Status;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
40 import org.onap.policy.cds.api.CdsProcessorListener;
41 import org.onap.policy.cds.client.CdsProcessorGrpcClient;
42 import org.onap.policy.cds.properties.CdsServerProperties;
43 import org.onap.policy.controlloop.ControlLoopOperation;
44 import org.onap.policy.controlloop.VirtualControlLoopEvent;
45 import org.onap.policy.controlloop.actor.cds.beans.ConfigDeployRequest;
46 import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
47 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
48 import org.onap.policy.controlloop.policy.Policy;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
54  */
55 public class CdsActorServiceProvider implements Actor {
56
57     private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
58
59     /**
60      * {@inheritDoc}.
61      */
62     @Override
63     public String actor() {
64         return CdsActorConstants.CDS_ACTOR;
65     }
66
67     /**
68      * {@inheritDoc}. Note: This is a placeholder for now.
69      */
70     @Override
71     public List<String> recipes() {
72         return new ArrayList<>();
73     }
74
75     /**
76      * {@inheritDoc}. Note: This is a placeholder for now.
77      */
78     @Override
79     public List<String> recipeTargets(final String recipe) {
80         return new ArrayList<>();
81     }
82
83     /**
84      * {@inheritDoc}. Note: This is a placeholder for now.
85      */
86     @Override
87     public List<String> recipePayloads(final String recipe) {
88         return new ArrayList<>();
89     }
90
91     /**
92      * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
93      * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
94      *
95      * @param onset the event that is reporting the alert for policy to perform an action.
96      * @param operation the control loop operation specifying the actor, operation, target, etc.
97      * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
98      * @param aaiParams Map of enriched AAI attributes in node.attribute notation.
99      * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
100      */
101     public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
102         ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
103
104         // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
105         // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
106         Map<String, String> payload = policy.getPayload();
107         if (!validateCdsMandatoryParams(policy)) {
108             return Optional.empty();
109         }
110         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
111         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
112         String cbaActionName = policy.getRecipe();
113
114         // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
115         ConfigDeployRequest request = new ConfigDeployRequest();
116         request.setConfigDeployProperties(payload);
117
118         // Inject AAI properties into payload map. Offer flexibility to the usecase
119         // implementation to inject whatever AAI parameters are of interest to them.
120         // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
121         request.setAaiProperties(aaiParams);
122
123         Builder struct = Struct.newBuilder();
124         try {
125             String requestStr = request.toString();
126             Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
127                 + "config-deploy-request from payload parameters: {}", payload);
128             JsonFormat.parser().merge(requestStr, struct);
129         } catch (InvalidProtocolBufferException e) {
130             LOGGER.error("Failed to parse received message. blueprint({}:{}) for action({})", cbaName, cbaVersion,
131                 cbaActionName, e);
132             return Optional.empty();
133         }
134
135         // Build CDS gRPC request common-header
136         CommonHeader commonHeader = CommonHeader.newBuilder()
137             .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
138             .setRequestId(onset.getRequestId().toString())
139             .setSubRequestId(operation.getSubRequestId())
140             .build();
141
142         // Build CDS gRPC request action-identifier
143         ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
144             .setBlueprintName(cbaName)
145             .setBlueprintVersion(cbaVersion)
146             .setActionName(cbaActionName)
147             .setMode(CdsActorConstants.CDS_MODE)
148             .build();
149
150         // Finally build the ExecutionServiceInput gRPC request object.
151         ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
152             .setCommonHeader(commonHeader)
153             .setActionIdentifiers(actionIdentifiers)
154             .setPayload(struct.build())
155             .build();
156         return Optional.of(executionServiceInput);
157     }
158
159     private boolean validateCdsMandatoryParams(Policy policy) {
160         if (policy == null || policy.getPayload() == null) {
161             return false;
162         }
163         Map<String, String> payload = policy.getPayload();
164         String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
165         String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
166         String cbaActionName = policy.getRecipe();
167         return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
168             .isNullOrEmpty(cbaActionName);
169     }
170
171     class CdsActorServiceManager implements CdsProcessorListener {
172
173         private final AtomicReference<String> cdsResponse = new AtomicReference<>();
174
175         /**
176          * {@inheritDoc}.
177          */
178         @Override
179         public void onMessage(final ExecutionServiceOutput message) {
180             LOGGER.info("Received notification from CDS: {}", message);
181             EventType eventType = message.getStatus().getEventType();
182             switch (eventType) {
183                 case EVENT_COMPONENT_FAILURE:
184                     cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
185                     break;
186                 case EVENT_COMPONENT_PROCESSING:
187                     cdsResponse.compareAndSet(null, CdsActorConstants.PROCESSING);
188                     break;
189                 case EVENT_COMPONENT_EXECUTED:
190                     cdsResponse.compareAndSet(null, CdsActorConstants.SUCCESS);
191                     break;
192                 default:
193                     cdsResponse.compareAndSet(null, CdsActorConstants.FAILED);
194                     break;
195             }
196         }
197
198         /**
199          * {@inheritDoc}.
200          */
201         @Override
202         public void onError(final Throwable throwable) {
203             Status status = Status.fromThrowable(throwable);
204             cdsResponse.compareAndSet(null, CdsActorConstants.ERROR);
205             LOGGER.error("Failed processing blueprint {} {}", status, throwable);
206         }
207
208         /**
209          * Send gRPC request to CDS to execute the blueprint.
210          *
211          * @param cdsClient CDS grpc client object.
212          * @param cdsProps CDS properties.
213          * @param executionServiceInput a valid CDS grpc request object.
214          * @return Status of the CDS request, null if timeout happens or onError is invoked for any reason.
215          */
216         public String sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
217             ExecutionServiceInput executionServiceInput) {
218             try {
219                 LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
220                 // TO-DO: Handle requests asynchronously once the callback support is added to actors.
221                 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
222                 boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
223                 if (!status) {
224                     cdsResponse.compareAndSet(null, CdsActorConstants.TIMED_OUT);
225                 }
226                 LOGGER.info("CDS response {}", getCdsResponse());
227             } catch (InterruptedException ex) {
228                 LOGGER.error("Caught exception in executeCdsBlueprintProcessor in CdsActorServiceProvider: ", ex);
229                 cdsResponse.compareAndSet(null, CdsActorConstants.INTERRUPTED);
230                 Thread.currentThread().interrupt();
231             }
232             LOGGER.info("Status of the CDS gRPC request is: {}", getCdsResponse());
233             return getCdsResponse();
234         }
235
236         String getCdsResponse() {
237             return cdsResponse.get();
238         }
239     }
240 }