Enable long-running processes in ControllerExecutionBB
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / onap / so / client / cds / AbstractCDSProcessingBBUtils.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2019 TechMahindra
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.client.cds;
24
25 import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
26 import com.google.protobuf.InvalidProtocolBufferException;
27 import com.google.protobuf.Struct;
28 import com.google.protobuf.Struct.Builder;
29 import com.google.protobuf.util.JsonFormat;
30 import io.grpc.Status;
31 import java.util.UUID;
32 import org.apache.commons.lang3.StringUtils;
33 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
34 import org.camunda.bpm.engine.ProcessEngine;
35 import org.camunda.bpm.engine.delegate.DelegateExecution;
36 import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.logging.filter.base.ONAPComponents;
43 import org.onap.so.bpmn.common.BuildingBlockExecution;
44 import org.onap.so.client.PreconditionFailedException;
45 import org.onap.so.client.RestPropertiesLoader;
46 import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
47 import org.onap.so.client.exception.BadResponseException;
48 import org.onap.so.client.exception.ExceptionBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.springframework.beans.factory.annotation.Autowired;
52 import org.springframework.stereotype.Component;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.TimeUnit;
55
56 /**
57  * Util class to support Call to CDS client
58  */
59 @Component
60 public class AbstractCDSProcessingBBUtils {
61
62     private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
63
64     private static final String SUCCESS = "Success";
65     private static final String FAILED = "Failed";
66     private static final String PROCESSING = "Processing";
67     private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
68     private static final String CDS_STATUS = "ControllerStatus";
69     private static final String EXEC_INPUT = "executionServiceInput";
70     private static final String EXECUTION_OBJECT = "executionObject";
71     private static final String EXCEPTION = "Exception";
72     private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
73     private static final String CONTROLLER_MESSAGE = "ControllerMessage";
74
75     private static final String REQ_ID = "requestId";
76
77     @Autowired
78     protected ExceptionBuilder exceptionUtil;
79
80     @Autowired
81     private ProcessEngine processEngine;
82
83     /**
84      * Extracting data from execution object and building the ExecutionServiceInput Object
85      *
86      * @param execution DelegateExecution object
87      */
88     public void constructExecutionServiceInputObject(DelegateExecution execution) {
89         logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for DelegateExecution object.");
90
91         try {
92             AbstractCDSPropertiesBean executionObject =
93                     (AbstractCDSPropertiesBean) execution.getVariable(EXECUTION_OBJECT);
94
95             ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
96
97             execution.setVariable(EXEC_INPUT, executionServiceInput);
98
99         } catch (Exception ex) {
100             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
101         }
102     }
103
104     /**
105      * Extracting data from execution object and building the ExecutionServiceInput Object
106      *
107      * @param execution BuildingBlockExecution object
108      */
109     public void constructExecutionServiceInputObjectBB(BuildingBlockExecution execution) {
110         logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for BuildingBlockExecution object.");
111
112         try {
113             AbstractCDSPropertiesBean executionObject = execution.getVariable(EXECUTION_OBJECT);
114
115             ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
116
117             execution.setVariable(EXEC_INPUT, executionServiceInput);
118             logger.debug("Input payload: " + executionServiceInput.getPayload());
119
120         } catch (Exception ex) {
121             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
122         }
123     }
124
125     /**
126      * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
127      *
128      * @param execution DelegateExecution object
129      */
130     public void sendRequestToCDSClient(DelegateExecution execution) {
131
132         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for DelegateExecution object.");
133         try {
134             ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
135             CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
136             execution.setVariable(CDS_STATUS, cdsResponse.status);
137
138             if (cdsResponse.payload != null) {
139                 String payload = JsonFormat.printer().print(cdsResponse.payload);
140                 execution.setVariable(RESPONSE_PAYLOAD, payload);
141             }
142
143         } catch (Exception ex) {
144             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
145         }
146     }
147
148     /**
149      * get the executionServiceInput object from execution and send a request to CDS Client
150      *
151      * @param execution BuildingBlockExecution object
152      */
153     public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
154         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
155         try {
156             ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
157
158             String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
159             if (StringUtils.isBlank(messageCorrelationId)) {
160                 throw new IllegalArgumentException("subRequestId can not be blank");
161             }
162             execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
163
164             MessageCorrelationBuilder messageCorrelationBuilder =
165                     processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
166                             .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
167             MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
168             CDSProcessingClient client = new CDSProcessingClient(handler);
169             handler.setClient(client);
170             client.sendRequest(executionServiceInput);
171         } catch (Exception ex) {
172             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
173         }
174     }
175
176     private CDSResponse getCdsResponse(ExecutionServiceInput executionServiceInput) throws BadResponseException {
177         CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
178         if (props == null) {
179             throw new PreconditionFailedException(
180                     "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
181         }
182
183         CDSResponse cdsResponse = new CDSResponse();
184
185         try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
186             CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
187             countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
188         } catch (InterruptedException ex) {
189             logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
190             Thread.currentThread().interrupt();
191         }
192
193         String cdsResponseStatus = cdsResponse.status;
194
195         /**
196          * throw CDS failed exception.
197          */
198         if (!cdsResponseStatus.equals(SUCCESS)) {
199             throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + " and errorMessage: "
200                     + cdsResponse.errorMessage);
201         }
202         return cdsResponse;
203     }
204
205     private ExecutionServiceInput prepareExecutionServiceInput(AbstractCDSPropertiesBean executionObject) {
206         String payload = executionObject.getRequestObject();
207
208         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
209                 .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
210                 .build();
211         ActionIdentifiers actionIdentifiers =
212                 ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
213                         .setBlueprintVersion(executionObject.getBlueprintVersion())
214                         .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
215
216         Builder struct = Struct.newBuilder();
217         try {
218             JsonFormat.parser().merge(payload, struct);
219         } catch (InvalidProtocolBufferException e) {
220             logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
221                     executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
222                     executionObject.getActionName(), e);
223         }
224
225         return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
226                 .setPayload(struct.build()).build();
227     }
228
229     private class ResponseHandler implements CDSProcessingListener {
230
231         private CDSResponse cdsResponse;
232
233         ResponseHandler(CDSResponse cdsResponse) {
234             this.cdsResponse = cdsResponse;
235         }
236
237         /**
238          * Get Response from CDS Client
239          */
240         @Override
241         public void onMessage(ExecutionServiceOutput message) {
242             logger.info("Received notification from CDS: {}", message);
243             EventType eventType = message.getStatus().getEventType();
244
245             switch (eventType) {
246                 case EVENT_COMPONENT_PROCESSING:
247                     cdsResponse.status = PROCESSING;
248                     break;
249                 case EVENT_COMPONENT_EXECUTED:
250                     cdsResponse.status = SUCCESS;
251                     break;
252                 default:
253                     cdsResponse.status = FAILED;
254                     cdsResponse.errorMessage = message.getStatus().getErrorMessage();
255                     break;
256             }
257             cdsResponse.payload = message.getPayload();
258         }
259
260         /**
261          * On error at CDS, log the error
262          */
263         @Override
264         public void onError(Throwable t) {
265             Status status = Status.fromThrowable(t);
266             logger.error("Failed processing blueprint {}", status, t);
267             cdsResponse.status = EXCEPTION;
268         }
269     }
270
271     private class MessageSendingHandler implements CDSProcessingListener {
272
273         private MessageCorrelationBuilder messageCorrelationBuilder;
274         private AutoCloseable client;
275         private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
276
277         MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
278             this.messageCorrelationBuilder = messageCorrelationBuilder;
279         }
280
281         public void setClient(AutoCloseable client) {
282             this.client = client;
283         }
284
285         @Override
286         public void onMessage(ExecutionServiceOutput message) {
287             logger.info("Received payload from CDS: {}", message);
288             EventType eventType = message.getStatus().getEventType();
289
290             if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
291                 return;
292             }
293
294             String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
295             messageCorrelationBuilder.setVariable(CDS_STATUS, status);
296             messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
297
298             if (message.hasPayload()) {
299                 try {
300                     String payload = JsonFormat.printer().print(message.getPayload());
301                     messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
302                 } catch (InvalidProtocolBufferException e) {
303                     logger.error("Failed parsing cds response", e);
304                 }
305             }
306             correlate();
307         }
308
309         @Override
310         public void onError(Throwable t) {
311             logger.error("Failed sending CDS request", t);
312             messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
313             messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
314             correlate();
315         }
316
317         /**
318          * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
319          * retry logic will allow camunda some time to finish transitioning the process.
320          */
321         private void correlate() {
322             try {
323                 int remainingTries = 10;
324                 while (!tryCorrelateMessage() && remainingTries > 0) {
325                     logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
326                     remainingTries--;
327                     Thread.sleep(1000L);
328                 }
329             } catch (InterruptedException e) {
330                 logger.error("Thread interrupted during message correlation", e);
331                 Thread.currentThread().interrupt();
332             } finally {
333                 closeClient();
334             }
335         }
336
337         private boolean tryCorrelateMessage() {
338             try {
339                 messageCorrelationBuilder.correlate();
340                 logger.info("Message correlation successful");
341                 return true;
342             } catch (MismatchingMessageCorrelationException e) {
343                 return false;
344             }
345         }
346
347         private void closeClient() {
348             if (client == null)
349                 throw new IllegalStateException("Client was not set and could not be closed");
350             try {
351                 client.close();
352             } catch (Exception e) {
353                 logger.error("Failed closing cds client", e);
354             }
355         }
356     }
357
358     private class CDSResponse {
359
360         String status;
361         String errorMessage;
362         Struct payload;
363
364         @Override
365         public String toString() {
366             return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="
367                     + payload + '}';
368         }
369     }
370 }