2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 TechMahindra
6 * ================================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.client.cds;
25 import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
26 import com.google.protobuf.InvalidProtocolBufferException;
27 import com.google.protobuf.Struct;
28 import com.google.protobuf.Struct.Builder;
29 import com.google.protobuf.util.JsonFormat;
30 import io.grpc.Status;
31 import java.util.UUID;
32 import org.apache.commons.lang3.StringUtils;
33 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
34 import org.camunda.bpm.engine.ProcessEngine;
35 import org.camunda.bpm.engine.delegate.DelegateExecution;
36 import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
38 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
39 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
41 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
42 import org.onap.logging.filter.base.ONAPComponents;
43 import org.onap.so.bpmn.common.BuildingBlockExecution;
44 import org.onap.so.client.PreconditionFailedException;
45 import org.onap.so.client.RestPropertiesLoader;
46 import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
47 import org.onap.so.client.exception.BadResponseException;
48 import org.onap.so.client.exception.ExceptionBuilder;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.springframework.beans.factory.annotation.Autowired;
52 import org.springframework.stereotype.Component;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.TimeUnit;
57 * Util class to support Call to CDS client
60 public class AbstractCDSProcessingBBUtils {
62 private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
64 private static final String SUCCESS = "Success";
65 private static final String FAILED = "Failed";
66 private static final String PROCESSING = "Processing";
67 private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
68 private static final String CDS_STATUS = "ControllerStatus";
69 private static final String EXEC_INPUT = "executionServiceInput";
70 private static final String EXECUTION_OBJECT = "executionObject";
71 private static final String EXCEPTION = "Exception";
72 private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
73 private static final String CONTROLLER_MESSAGE = "ControllerMessage";
75 private static final String REQ_ID = "requestId";
78 protected ExceptionBuilder exceptionUtil;
81 private ProcessEngine processEngine;
84 * Extracting data from execution object and building the ExecutionServiceInput Object
86 * @param execution DelegateExecution object
88 public void constructExecutionServiceInputObject(DelegateExecution execution) {
89 logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for DelegateExecution object.");
92 AbstractCDSPropertiesBean executionObject =
93 (AbstractCDSPropertiesBean) execution.getVariable(EXECUTION_OBJECT);
95 ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
97 execution.setVariable(EXEC_INPUT, executionServiceInput);
99 } catch (Exception ex) {
100 exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
105 * Extracting data from execution object and building the ExecutionServiceInput Object
107 * @param execution BuildingBlockExecution object
109 public void constructExecutionServiceInputObjectBB(BuildingBlockExecution execution) {
110 logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for BuildingBlockExecution object.");
113 AbstractCDSPropertiesBean executionObject = execution.getVariable(EXECUTION_OBJECT);
115 ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
117 execution.setVariable(EXEC_INPUT, executionServiceInput);
118 logger.debug("Input payload: " + executionServiceInput.getPayload());
120 } catch (Exception ex) {
121 exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
126 * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
128 * @param execution DelegateExecution object
130 public void sendRequestToCDSClient(DelegateExecution execution) {
132 logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for DelegateExecution object.");
134 ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
135 CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
136 execution.setVariable(CDS_STATUS, cdsResponse.status);
138 if (cdsResponse.payload != null) {
139 String payload = JsonFormat.printer().print(cdsResponse.payload);
140 execution.setVariable(RESPONSE_PAYLOAD, payload);
143 } catch (Exception ex) {
144 exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
149 * get the executionServiceInput object from execution and send a request to CDS Client
151 * @param execution BuildingBlockExecution object
153 public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
154 logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
156 ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
158 String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
159 if (StringUtils.isBlank(messageCorrelationId)) {
160 throw new IllegalArgumentException("subRequestId can not be blank");
162 execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
164 MessageCorrelationBuilder messageCorrelationBuilder =
165 processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
166 .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
167 MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
168 CDSProcessingClient client = new CDSProcessingClient(handler);
169 handler.setClient(client);
170 client.sendRequest(executionServiceInput);
171 } catch (Exception ex) {
172 exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
176 private CDSResponse getCdsResponse(ExecutionServiceInput executionServiceInput) throws BadResponseException {
177 CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
179 throw new PreconditionFailedException(
180 "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
183 CDSResponse cdsResponse = new CDSResponse();
185 try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
186 CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
187 countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
188 } catch (InterruptedException ex) {
189 logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
190 Thread.currentThread().interrupt();
193 String cdsResponseStatus = cdsResponse.status;
196 * throw CDS failed exception.
198 if (!cdsResponseStatus.equals(SUCCESS)) {
199 throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + " and errorMessage: "
200 + cdsResponse.errorMessage);
205 private ExecutionServiceInput prepareExecutionServiceInput(AbstractCDSPropertiesBean executionObject) {
206 String payload = executionObject.getRequestObject();
208 CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
209 .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
211 ActionIdentifiers actionIdentifiers =
212 ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
213 .setBlueprintVersion(executionObject.getBlueprintVersion())
214 .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
216 Builder struct = Struct.newBuilder();
218 JsonFormat.parser().merge(payload, struct);
219 } catch (InvalidProtocolBufferException e) {
220 logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
221 executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
222 executionObject.getActionName(), e);
225 return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
226 .setPayload(struct.build()).build();
229 private class ResponseHandler implements CDSProcessingListener {
231 private CDSResponse cdsResponse;
233 ResponseHandler(CDSResponse cdsResponse) {
234 this.cdsResponse = cdsResponse;
238 * Get Response from CDS Client
241 public void onMessage(ExecutionServiceOutput message) {
242 logger.info("Received notification from CDS: {}", message);
243 EventType eventType = message.getStatus().getEventType();
246 case EVENT_COMPONENT_PROCESSING:
247 cdsResponse.status = PROCESSING;
249 case EVENT_COMPONENT_EXECUTED:
250 cdsResponse.status = SUCCESS;
253 cdsResponse.status = FAILED;
254 cdsResponse.errorMessage = message.getStatus().getErrorMessage();
257 cdsResponse.payload = message.getPayload();
261 * On error at CDS, log the error
264 public void onError(Throwable t) {
265 Status status = Status.fromThrowable(t);
266 logger.error("Failed processing blueprint {}", status, t);
267 cdsResponse.status = EXCEPTION;
271 private class MessageSendingHandler implements CDSProcessingListener {
273 private MessageCorrelationBuilder messageCorrelationBuilder;
274 private AutoCloseable client;
275 private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
277 MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
278 this.messageCorrelationBuilder = messageCorrelationBuilder;
281 public void setClient(AutoCloseable client) {
282 this.client = client;
286 public void onMessage(ExecutionServiceOutput message) {
287 logger.info("Received payload from CDS: {}", message);
288 EventType eventType = message.getStatus().getEventType();
290 if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
294 String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
295 messageCorrelationBuilder.setVariable(CDS_STATUS, status);
296 messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
298 if (message.hasPayload()) {
300 String payload = JsonFormat.printer().print(message.getPayload());
301 messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
302 } catch (InvalidProtocolBufferException e) {
303 logger.error("Failed parsing cds response", e);
310 public void onError(Throwable t) {
311 logger.error("Failed sending CDS request", t);
312 messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
313 messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
318 * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
319 * retry logic will allow camunda some time to finish transitioning the process.
321 private void correlate() {
323 int remainingTries = 10;
324 while (!tryCorrelateMessage() && remainingTries > 0) {
325 logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
329 } catch (InterruptedException e) {
330 logger.error("Thread interrupted during message correlation", e);
331 Thread.currentThread().interrupt();
337 private boolean tryCorrelateMessage() {
339 messageCorrelationBuilder.correlate();
340 logger.info("Message correlation successful");
342 } catch (MismatchingMessageCorrelationException e) {
347 private void closeClient() {
349 throw new IllegalStateException("Client was not set and could not be closed");
352 } catch (Exception e) {
353 logger.error("Failed closing cds client", e);
358 private class CDSResponse {
365 public String toString() {
366 return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="