Enable long-running processes in ControllerExecutionBB
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / onap / so / client / cds / AbstractCDSProcessingBBUtils.java
index 5498b5b..3ed1011 100644 (file)
 
 package org.onap.so.client.cds;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.Status;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
+import org.camunda.bpm.engine.ProcessEngine;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.logging.filter.base.ONAPComponents;
+import org.onap.so.bpmn.common.BuildingBlockExecution;
 import org.onap.so.client.PreconditionFailedException;
 import org.onap.so.client.RestPropertiesLoader;
 import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
@@ -40,72 +50,72 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Struct;
-import com.google.protobuf.Struct.Builder;
-import com.google.protobuf.util.JsonFormat;
-import io.grpc.Status;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Util class to support Call to CDS client
- *
  */
 @Component
-public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
+public class AbstractCDSProcessingBBUtils {
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
 
     private static final String SUCCESS = "Success";
     private static final String FAILED = "Failed";
     private static final String PROCESSING = "Processing";
-
-    /**
-     * indicate exception thrown.
-     */
+    private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
+    private static final String CDS_STATUS = "ControllerStatus";
+    private static final String EXEC_INPUT = "executionServiceInput";
+    private static final String EXECUTION_OBJECT = "executionObject";
     private static final String EXCEPTION = "Exception";
+    private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
+    private static final String CONTROLLER_MESSAGE = "ControllerMessage";
 
+    private static final String REQ_ID = "requestId";
 
-    private final AtomicReference<String> cdsResponse = new AtomicReference<>();
+    @Autowired
+    protected ExceptionBuilder exceptionUtil;
 
     @Autowired
-    private ExceptionBuilder exceptionUtil;
+    private ProcessEngine processEngine;
 
     /**
      * Extracting data from execution object and building the ExecutionServiceInput Object
-     * 
+     *
      * @param execution DelegateExecution object
      */
     public void constructExecutionServiceInputObject(DelegateExecution execution) {
-        logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest ");
+        logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for DelegateExecution object.");
 
         try {
             AbstractCDSPropertiesBean executionObject =
-                    (AbstractCDSPropertiesBean) execution.getVariable("executionObject");
+                    (AbstractCDSPropertiesBean) execution.getVariable(EXECUTION_OBJECT);
 
-            String payload = executionObject.getRequestObject();
+            ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
 
-            CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
-                    .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
-                    .build();
-            ActionIdentifiers actionIdentifiers =
-                    ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
-                            .setBlueprintVersion(executionObject.getBlueprintVersion())
-                            .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
+            execution.setVariable(EXEC_INPUT, executionServiceInput);
 
-            Builder struct = Struct.newBuilder();
-            try {
-                JsonFormat.parser().merge(payload, struct);
-            } catch (InvalidProtocolBufferException e) {
-                logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
-                        executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
-                        executionObject.getActionName(), e);
-            }
+        } catch (Exception ex) {
+            exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
+        }
+    }
+
+    /**
+     * Extracting data from execution object and building the ExecutionServiceInput Object
+     *
+     * @param execution BuildingBlockExecution object
+     */
+    public void constructExecutionServiceInputObjectBB(BuildingBlockExecution execution) {
+        logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for BuildingBlockExecution object.");
 
-            ExecutionServiceInput executionServiceInput =
-                    ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
-                            .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
+        try {
+            AbstractCDSPropertiesBean executionObject = execution.getVariable(EXECUTION_OBJECT);
 
-            execution.setVariable("executionServiceInput", executionServiceInput);
+            ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
+
+            execution.setVariable(EXEC_INPUT, executionServiceInput);
+            logger.debug("Input payload: " + executionServiceInput.getPayload());
 
         } catch (Exception ex) {
             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
@@ -114,85 +124,247 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
 
     /**
      * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
-     * 
+     *
      * @param execution DelegateExecution object
      */
     public void sendRequestToCDSClient(DelegateExecution execution) {
 
-        logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient ");
+        logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for DelegateExecution object.");
         try {
-            CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
-            if (props == null) {
-                throw new PreconditionFailedException(
-                        "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
-            }
-
-            ExecutionServiceInput executionServiceInput =
-                    (ExecutionServiceInput) execution.getVariable("executionServiceInput");
+            ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
+            CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
+            execution.setVariable(CDS_STATUS, cdsResponse.status);
 
-            try (CDSProcessingClient cdsClient = new CDSProcessingClient(this)) {
-                CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
-                countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
-            } catch (InterruptedException ex) {
-                logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
-                Thread.currentThread().interrupt();
+            if (cdsResponse.payload != null) {
+                String payload = JsonFormat.printer().print(cdsResponse.payload);
+                execution.setVariable(RESPONSE_PAYLOAD, payload);
             }
 
-            if (cdsResponse != null) {
-                String cdsResponseStatus = cdsResponse.get();
-                execution.setVariable("CDSStatus", cdsResponseStatus);
+        } catch (Exception ex) {
+            exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
+        }
+    }
 
-                /**
-                 * throw CDS failed exception.
-                 */
-                if (cdsResponseStatus != SUCCESS) {
-                    throw new BadResponseException("CDS call failed with status: " + cdsResponseStatus);
-                }
+    /**
+     * get the executionServiceInput object from execution and send a request to CDS Client
+     *
+     * @param execution BuildingBlockExecution object
+     */
+    public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
+        logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
+        try {
+            ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
+
+            String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
+            if (StringUtils.isBlank(messageCorrelationId)) {
+                throw new IllegalArgumentException("subRequestId can not be blank");
             }
+            execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
 
+            MessageCorrelationBuilder messageCorrelationBuilder =
+                    processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
+                            .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
+            MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
+            CDSProcessingClient client = new CDSProcessingClient(handler);
+            handler.setClient(client);
+            client.sendRequest(executionServiceInput);
         } catch (Exception ex) {
             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
         }
     }
 
-    /**
-     * Get Response from CDS Client
-     * 
-     */
-    @Override
-    public void onMessage(ExecutionServiceOutput message) {
-        logger.info("Received notification from CDS: {}", message);
-        EventType eventType = message.getStatus().getEventType();
-
-        switch (eventType) {
-
-            case EVENT_COMPONENT_FAILURE:
-                // failed processing with failure
-                cdsResponse.set(FAILED);
-                break;
-            case EVENT_COMPONENT_PROCESSING:
-                // still processing
-                cdsResponse.set(PROCESSING);
-                break;
-            case EVENT_COMPONENT_EXECUTED:
-                // done with async processing
-                cdsResponse.set(SUCCESS);
-                break;
-            default:
-                cdsResponse.set(FAILED);
-                break;
+    private CDSResponse getCdsResponse(ExecutionServiceInput executionServiceInput) throws BadResponseException {
+        CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
+        if (props == null) {
+            throw new PreconditionFailedException(
+                    "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
         }
 
+        CDSResponse cdsResponse = new CDSResponse();
+
+        try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
+            CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
+            countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
+        } catch (InterruptedException ex) {
+            logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
+            Thread.currentThread().interrupt();
+        }
+
+        String cdsResponseStatus = cdsResponse.status;
+
+        /**
+         * throw CDS failed exception.
+         */
+        if (!cdsResponseStatus.equals(SUCCESS)) {
+            throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + " and errorMessage: "
+                    + cdsResponse.errorMessage);
+        }
+        return cdsResponse;
     }
 
-    /**
-     * On error at CDS, log the error
-     */
-    @Override
-    public void onError(Throwable t) {
-        Status status = Status.fromThrowable(t);
-        logger.error("Failed processing blueprint {}", status, t);
-        cdsResponse.set(EXCEPTION);
+    private ExecutionServiceInput prepareExecutionServiceInput(AbstractCDSPropertiesBean executionObject) {
+        String payload = executionObject.getRequestObject();
+
+        CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
+                .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
+                .build();
+        ActionIdentifiers actionIdentifiers =
+                ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
+                        .setBlueprintVersion(executionObject.getBlueprintVersion())
+                        .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
+
+        Builder struct = Struct.newBuilder();
+        try {
+            JsonFormat.parser().merge(payload, struct);
+        } catch (InvalidProtocolBufferException e) {
+            logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
+                    executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
+                    executionObject.getActionName(), e);
+        }
+
+        return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
+                .setPayload(struct.build()).build();
+    }
+
+    private class ResponseHandler implements CDSProcessingListener {
+
+        private CDSResponse cdsResponse;
+
+        ResponseHandler(CDSResponse cdsResponse) {
+            this.cdsResponse = cdsResponse;
+        }
+
+        /**
+         * Get Response from CDS Client
+         */
+        @Override
+        public void onMessage(ExecutionServiceOutput message) {
+            logger.info("Received notification from CDS: {}", message);
+            EventType eventType = message.getStatus().getEventType();
+
+            switch (eventType) {
+                case EVENT_COMPONENT_PROCESSING:
+                    cdsResponse.status = PROCESSING;
+                    break;
+                case EVENT_COMPONENT_EXECUTED:
+                    cdsResponse.status = SUCCESS;
+                    break;
+                default:
+                    cdsResponse.status = FAILED;
+                    cdsResponse.errorMessage = message.getStatus().getErrorMessage();
+                    break;
+            }
+            cdsResponse.payload = message.getPayload();
+        }
+
+        /**
+         * On error at CDS, log the error
+         */
+        @Override
+        public void onError(Throwable t) {
+            Status status = Status.fromThrowable(t);
+            logger.error("Failed processing blueprint {}", status, t);
+            cdsResponse.status = EXCEPTION;
+        }
     }
 
+    private class MessageSendingHandler implements CDSProcessingListener {
+
+        private MessageCorrelationBuilder messageCorrelationBuilder;
+        private AutoCloseable client;
+        private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
+
+        MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
+            this.messageCorrelationBuilder = messageCorrelationBuilder;
+        }
+
+        public void setClient(AutoCloseable client) {
+            this.client = client;
+        }
+
+        @Override
+        public void onMessage(ExecutionServiceOutput message) {
+            logger.info("Received payload from CDS: {}", message);
+            EventType eventType = message.getStatus().getEventType();
+
+            if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
+                return;
+            }
+
+            String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
+            messageCorrelationBuilder.setVariable(CDS_STATUS, status);
+            messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
+
+            if (message.hasPayload()) {
+                try {
+                    String payload = JsonFormat.printer().print(message.getPayload());
+                    messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
+                } catch (InvalidProtocolBufferException e) {
+                    logger.error("Failed parsing cds response", e);
+                }
+            }
+            correlate();
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            logger.error("Failed sending CDS request", t);
+            messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
+            messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
+            correlate();
+        }
+
+        /**
+         * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
+         * retry logic will allow camunda some time to finish transitioning the process.
+         */
+        private void correlate() {
+            try {
+                int remainingTries = 10;
+                while (!tryCorrelateMessage() && remainingTries > 0) {
+                    logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
+                    remainingTries--;
+                    Thread.sleep(1000L);
+                }
+            } catch (InterruptedException e) {
+                logger.error("Thread interrupted during message correlation", e);
+                Thread.currentThread().interrupt();
+            } finally {
+                closeClient();
+            }
+        }
+
+        private boolean tryCorrelateMessage() {
+            try {
+                messageCorrelationBuilder.correlate();
+                logger.info("Message correlation successful");
+                return true;
+            } catch (MismatchingMessageCorrelationException e) {
+                return false;
+            }
+        }
+
+        private void closeClient() {
+            if (client == null)
+                throw new IllegalStateException("Client was not set and could not be closed");
+            try {
+                client.close();
+            } catch (Exception e) {
+                logger.error("Failed closing cds client", e);
+            }
+        }
+    }
+
+    private class CDSResponse {
+
+        String status;
+        String errorMessage;
+        Struct payload;
+
+        @Override
+        public String toString() {
+            return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="
+                    + payload + '}';
+        }
+    }
 }