X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=bpmn%2FMSOCommonBPMN%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fso%2Fclient%2Fcds%2FAbstractCDSProcessingBBUtils.java;h=3ed1011ee99971febd0198379f7e24c45eb4ef29;hb=5baa1ed97c1d2b98952a025c3bc76f60587e9670;hp=e5d8a921a56d1b7f54f74f5a1dbeb8c35e76b916;hpb=366a173f798422b956625aa83d81fc863e0914a5;p=so.git diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java index e5d8a921a5..3ed1011ee9 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java @@ -22,17 +22,24 @@ package org.onap.so.client.cds; +import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Struct; import com.google.protobuf.Struct.Builder; import com.google.protobuf.util.JsonFormat; import io.grpc.Status; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.camunda.bpm.engine.MismatchingMessageCorrelationException; +import org.camunda.bpm.engine.ProcessEngine; import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder; import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.logging.filter.base.ONAPComponents; import org.onap.so.bpmn.common.BuildingBlockExecution; import org.onap.so.client.PreconditionFailedException; import org.onap.so.client.RestPropertiesLoader; @@ -62,10 +69,17 @@ public class AbstractCDSProcessingBBUtils { private static final String EXEC_INPUT = "executionServiceInput"; private static final String EXECUTION_OBJECT = "executionObject"; private static final String EXCEPTION = "Exception"; + private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID"; + private static final String CONTROLLER_MESSAGE = "ControllerMessage"; + + private static final String REQ_ID = "requestId"; @Autowired protected ExceptionBuilder exceptionUtil; + @Autowired + private ProcessEngine processEngine; + /** * Extracting data from execution object and building the ExecutionServiceInput Object * @@ -132,23 +146,28 @@ public class AbstractCDSProcessingBBUtils { } /** - * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period + * get the executionServiceInput object from execution and send a request to CDS Client * * @param execution BuildingBlockExecution object */ public void sendRequestToCDSClientBB(BuildingBlockExecution execution) { - logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object."); try { ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT); - CDSResponse cdsResponse = getCdsResponse(executionServiceInput); - execution.setVariable(CDS_STATUS, cdsResponse.status); - if (cdsResponse.payload != null) { - String payload = JsonFormat.printer().print(cdsResponse.payload); - execution.setVariable(RESPONSE_PAYLOAD, payload); + String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId(); + if (StringUtils.isBlank(messageCorrelationId)) { + throw new IllegalArgumentException("subRequestId can not be blank"); } - + execution.setVariable(CDS_REQUEST_ID, messageCorrelationId); + + MessageCorrelationBuilder messageCorrelationBuilder = + processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE) + .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId); + MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder); + CDSProcessingClient client = new CDSProcessingClient(handler); + handler.setClient(client); + client.sendRequest(executionServiceInput); } catch (Exception ex) { exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex); } @@ -249,6 +268,93 @@ public class AbstractCDSProcessingBBUtils { } } + private class MessageSendingHandler implements CDSProcessingListener { + + private MessageCorrelationBuilder messageCorrelationBuilder; + private AutoCloseable client; + private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class); + + MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) { + this.messageCorrelationBuilder = messageCorrelationBuilder; + } + + public void setClient(AutoCloseable client) { + this.client = client; + } + + @Override + public void onMessage(ExecutionServiceOutput message) { + logger.info("Received payload from CDS: {}", message); + EventType eventType = message.getStatus().getEventType(); + + if (eventType == EventType.EVENT_COMPONENT_PROCESSING) { + return; + } + + String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED; + messageCorrelationBuilder.setVariable(CDS_STATUS, status); + messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage()); + + if (message.hasPayload()) { + try { + String payload = JsonFormat.printer().print(message.getPayload()); + messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload); + } catch (InvalidProtocolBufferException e) { + logger.error("Failed parsing cds response", e); + } + } + correlate(); + } + + @Override + public void onError(Throwable t) { + logger.error("Failed sending CDS request", t); + messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage()); + messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED); + correlate(); + } + + /** + * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This + * retry logic will allow camunda some time to finish transitioning the process. + */ + private void correlate() { + try { + int remainingTries = 10; + while (!tryCorrelateMessage() && remainingTries > 0) { + logger.warn("Message correlation failed. Retries remaining: {}", remainingTries); + remainingTries--; + Thread.sleep(1000L); + } + } catch (InterruptedException e) { + logger.error("Thread interrupted during message correlation", e); + Thread.currentThread().interrupt(); + } finally { + closeClient(); + } + } + + private boolean tryCorrelateMessage() { + try { + messageCorrelationBuilder.correlate(); + logger.info("Message correlation successful"); + return true; + } catch (MismatchingMessageCorrelationException e) { + return false; + } + } + + private void closeClient() { + if (client == null) + throw new IllegalStateException("Client was not set and could not be closed"); + try { + client.close(); + } catch (Exception e) { + logger.error("Failed closing cds client", e); + } + } + } + private class CDSResponse { String status;