Enable long-running processes in ControllerExecutionBB
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / onap / so / client / cds / AbstractCDSProcessingBBUtils.java
index e5d8a92..3ed1011 100644 (file)
 
 package org.onap.so.client.cds;
 
+import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Struct;
 import com.google.protobuf.Struct.Builder;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Status;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
+import org.camunda.bpm.engine.ProcessEngine;
 import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.logging.filter.base.ONAPComponents;
 import org.onap.so.bpmn.common.BuildingBlockExecution;
 import org.onap.so.client.PreconditionFailedException;
 import org.onap.so.client.RestPropertiesLoader;
@@ -62,10 +69,17 @@ public class AbstractCDSProcessingBBUtils {
     private static final String EXEC_INPUT = "executionServiceInput";
     private static final String EXECUTION_OBJECT = "executionObject";
     private static final String EXCEPTION = "Exception";
+    private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
+    private static final String CONTROLLER_MESSAGE = "ControllerMessage";
+
+    private static final String REQ_ID = "requestId";
 
     @Autowired
     protected ExceptionBuilder exceptionUtil;
 
+    @Autowired
+    private ProcessEngine processEngine;
+
     /**
      * Extracting data from execution object and building the ExecutionServiceInput Object
      *
@@ -132,23 +146,28 @@ public class AbstractCDSProcessingBBUtils {
     }
 
     /**
-     * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
+     * get the executionServiceInput object from execution and send a request to CDS Client
      *
      * @param execution BuildingBlockExecution object
      */
     public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
-
         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
         try {
             ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
-            CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
-            execution.setVariable(CDS_STATUS, cdsResponse.status);
 
-            if (cdsResponse.payload != null) {
-                String payload = JsonFormat.printer().print(cdsResponse.payload);
-                execution.setVariable(RESPONSE_PAYLOAD, payload);
+            String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
+            if (StringUtils.isBlank(messageCorrelationId)) {
+                throw new IllegalArgumentException("subRequestId can not be blank");
             }
-
+            execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
+
+            MessageCorrelationBuilder messageCorrelationBuilder =
+                    processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
+                            .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
+            MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
+            CDSProcessingClient client = new CDSProcessingClient(handler);
+            handler.setClient(client);
+            client.sendRequest(executionServiceInput);
         } catch (Exception ex) {
             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
         }
@@ -249,6 +268,93 @@ public class AbstractCDSProcessingBBUtils {
         }
     }
 
+    private class MessageSendingHandler implements CDSProcessingListener {
+
+        private MessageCorrelationBuilder messageCorrelationBuilder;
+        private AutoCloseable client;
+        private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
+
+        MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
+            this.messageCorrelationBuilder = messageCorrelationBuilder;
+        }
+
+        public void setClient(AutoCloseable client) {
+            this.client = client;
+        }
+
+        @Override
+        public void onMessage(ExecutionServiceOutput message) {
+            logger.info("Received payload from CDS: {}", message);
+            EventType eventType = message.getStatus().getEventType();
+
+            if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
+                return;
+            }
+
+            String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
+            messageCorrelationBuilder.setVariable(CDS_STATUS, status);
+            messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
+
+            if (message.hasPayload()) {
+                try {
+                    String payload = JsonFormat.printer().print(message.getPayload());
+                    messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
+                } catch (InvalidProtocolBufferException e) {
+                    logger.error("Failed parsing cds response", e);
+                }
+            }
+            correlate();
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            logger.error("Failed sending CDS request", t);
+            messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
+            messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
+            correlate();
+        }
+
+        /**
+         * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
+         * retry logic will allow camunda some time to finish transitioning the process.
+         */
+        private void correlate() {
+            try {
+                int remainingTries = 10;
+                while (!tryCorrelateMessage() && remainingTries > 0) {
+                    logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
+                    remainingTries--;
+                    Thread.sleep(1000L);
+                }
+            } catch (InterruptedException e) {
+                logger.error("Thread interrupted during message correlation", e);
+                Thread.currentThread().interrupt();
+            } finally {
+                closeClient();
+            }
+        }
+
+        private boolean tryCorrelateMessage() {
+            try {
+                messageCorrelationBuilder.correlate();
+                logger.info("Message correlation successful");
+                return true;
+            } catch (MismatchingMessageCorrelationException e) {
+                return false;
+            }
+        }
+
+        private void closeClient() {
+            if (client == null)
+                throw new IllegalStateException("Client was not set and could not be closed");
+            try {
+                client.close();
+            } catch (Exception e) {
+                logger.error("Failed closing cds client", e);
+            }
+        }
+    }
+
     private class CDSResponse {
 
         String status;