+ private class MessageSendingHandler implements CDSProcessingListener {
+
+ private MessageCorrelationBuilder messageCorrelationBuilder;
+ private AutoCloseable client;
+ private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
+
+ MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
+ this.messageCorrelationBuilder = messageCorrelationBuilder;
+ }
+
+ public void setClient(AutoCloseable client) {
+ this.client = client;
+ }
+
+ @Override
+ public void onMessage(ExecutionServiceOutput message) {
+ logger.info("Received payload from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+
+ if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
+ return;
+ }
+
+ String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
+ messageCorrelationBuilder.setVariable(CDS_STATUS, status);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
+
+ if (message.hasPayload()) {
+ try {
+ String payload = JsonFormat.printer().print(message.getPayload());
+ messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Failed parsing cds response", e);
+ }
+ }
+ correlate();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("Failed sending CDS request", t);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
+ messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
+ correlate();
+ }
+
+ /**
+ * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
+ * retry logic will allow camunda some time to finish transitioning the process.
+ */
+ private void correlate() {
+ try {
+ int remainingTries = 10;
+ while (!tryCorrelateMessage() && remainingTries > 0) {
+ logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
+ remainingTries--;
+ Thread.sleep(1000L);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Thread interrupted during message correlation", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ closeClient();
+ }
+ }
+
+ private boolean tryCorrelateMessage() {
+ try {
+ messageCorrelationBuilder.correlate();
+ logger.info("Message correlation successful");
+ return true;
+ } catch (MismatchingMessageCorrelationException e) {
+ return false;
+ }
+ }
+
+ private void closeClient() {
+ if (client == null)
+ throw new IllegalStateException("Client was not set and could not be closed");
+ try {
+ client.close();
+ } catch (Exception e) {
+ logger.error("Failed closing cds client", e);
+ }
+ }
+ }
+