package org.onap.so.client.cds;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.Status;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
+import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.logging.filter.base.ONAPComponents;
+import org.onap.so.bpmn.common.BuildingBlockExecution;
import org.onap.so.client.PreconditionFailedException;
import org.onap.so.client.RestPropertiesLoader;
import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
+import org.onap.so.client.exception.BadResponseException;
import org.onap.so.client.exception.ExceptionBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Struct;
-import com.google.protobuf.Struct.Builder;
-import com.google.protobuf.util.JsonFormat;
-import io.grpc.Status;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Util class to support Call to CDS client
- *
*/
@Component
-public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
+public class AbstractCDSProcessingBBUtils {
private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
private static final String SUCCESS = "Success";
private static final String FAILED = "Failed";
private static final String PROCESSING = "Processing";
+ private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
+ private static final String CDS_STATUS = "ControllerStatus";
+ private static final String EXEC_INPUT = "executionServiceInput";
+ private static final String EXECUTION_OBJECT = "executionObject";
+ private static final String EXCEPTION = "Exception";
+ private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
+ private static final String CONTROLLER_MESSAGE = "ControllerMessage";
+
+ private static final String REQ_ID = "requestId";
- private final AtomicReference<String> cdsResponse = new AtomicReference<>();
+ @Autowired
+ protected ExceptionBuilder exceptionUtil;
@Autowired
- private ExceptionBuilder exceptionUtil;
+ private ProcessEngine processEngine;
/**
* Extracting data from execution object and building the ExecutionServiceInput Object
- *
+ *
* @param execution DelegateExecution object
*/
public void constructExecutionServiceInputObject(DelegateExecution execution) {
- logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest ");
+ logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for DelegateExecution object.");
try {
AbstractCDSPropertiesBean executionObject =
- (AbstractCDSPropertiesBean) execution.getVariable("executionObject");
+ (AbstractCDSPropertiesBean) execution.getVariable(EXECUTION_OBJECT);
- String payload = executionObject.getRequestObject();
+ ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
- CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
- .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
- .build();
- ActionIdentifiers actionIdentifiers =
- ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
- .setBlueprintVersion(executionObject.getBlueprintVersion())
- .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
+ execution.setVariable(EXEC_INPUT, executionServiceInput);
- Builder struct = Struct.newBuilder();
- try {
- JsonFormat.parser().merge(payload, struct);
- } catch (InvalidProtocolBufferException e) {
- logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
- executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
- executionObject.getActionName(), e);
- }
+ } catch (Exception ex) {
+ exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
+ }
+ }
- ExecutionServiceInput executionServiceInput =
- ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
- .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
+ /**
+ * Extracting data from execution object and building the ExecutionServiceInput Object
+ *
+ * @param execution BuildingBlockExecution object
+ */
+ public void constructExecutionServiceInputObjectBB(BuildingBlockExecution execution) {
+ logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for BuildingBlockExecution object.");
+
+ try {
+ AbstractCDSPropertiesBean executionObject = execution.getVariable(EXECUTION_OBJECT);
- execution.setVariable("executionServiceInput", executionServiceInput);
+ ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
+
+ execution.setVariable(EXEC_INPUT, executionServiceInput);
+ logger.debug("Input payload: " + executionServiceInput.getPayload());
} catch (Exception ex) {
exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
/**
* get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
- *
+ *
* @param execution DelegateExecution object
*/
public void sendRequestToCDSClient(DelegateExecution execution) {
- logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient ");
+ logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for DelegateExecution object.");
try {
- CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
- if (props == null) {
- throw new PreconditionFailedException(
- "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
+ ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
+ CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
+ execution.setVariable(CDS_STATUS, cdsResponse.status);
+
+ if (cdsResponse.payload != null) {
+ String payload = JsonFormat.printer().print(cdsResponse.payload);
+ execution.setVariable(RESPONSE_PAYLOAD, payload);
}
- ExecutionServiceInput executionServiceInput =
- (ExecutionServiceInput) execution.getVariable("executionServiceInput");
+ } catch (Exception ex) {
+ exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
+ }
+ }
- try (CDSProcessingClient cdsClient = new CDSProcessingClient(this)) {
- CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
- countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
- Thread.currentThread().interrupt();
- }
+ /**
+ * get the executionServiceInput object from execution and send a request to CDS Client
+ *
+ * @param execution BuildingBlockExecution object
+ */
+ public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
+ logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
+ try {
+ ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
- if (cdsResponse != null) {
- execution.setVariable("CDSStatus", cdsResponse.get());
+ String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
+ if (StringUtils.isBlank(messageCorrelationId)) {
+ throw new IllegalArgumentException("subRequestId can not be blank");
}
+ execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
+ MessageCorrelationBuilder messageCorrelationBuilder =
+ processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
+ .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
+ MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
+ CDSProcessingClient client = new CDSProcessingClient(handler);
+ handler.setClient(client);
+ client.sendRequest(executionServiceInput);
} catch (Exception ex) {
exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
}
}
- /**
- * Get Response from CDS Client
- *
- */
- @Override
- public void onMessage(ExecutionServiceOutput message) {
- logger.info("Received notification from CDS: {}", message);
- EventType eventType = message.getStatus().getEventType();
-
- switch (eventType) {
-
- case EVENT_COMPONENT_FAILURE:
- // failed processing with failure
- cdsResponse.set(FAILED);
- break;
- case EVENT_COMPONENT_PROCESSING:
- // still processing
- cdsResponse.set(PROCESSING);
- break;
- case EVENT_COMPONENT_EXECUTED:
- // done with async processing
- cdsResponse.set(SUCCESS);
- break;
- default:
- cdsResponse.set(FAILED);
- break;
+ private CDSResponse getCdsResponse(ExecutionServiceInput executionServiceInput) throws BadResponseException {
+ CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
+ if (props == null) {
+ throw new PreconditionFailedException(
+ "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
+ }
+
+ CDSResponse cdsResponse = new CDSResponse();
+
+ try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
+ CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
+ countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
+ Thread.currentThread().interrupt();
}
+ String cdsResponseStatus = cdsResponse.status;
+
+ /**
+ * throw CDS failed exception.
+ */
+ if (!cdsResponseStatus.equals(SUCCESS)) {
+ throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + " and errorMessage: "
+ + cdsResponse.errorMessage);
+ }
+ return cdsResponse;
}
- /**
- * On error at CDS, log the error
- */
- @Override
- public void onError(Throwable t) {
- Status status = Status.fromThrowable(t);
- logger.error("Failed processing blueprint {}", status, t);
+ private ExecutionServiceInput prepareExecutionServiceInput(AbstractCDSPropertiesBean executionObject) {
+ String payload = executionObject.getRequestObject();
+
+ CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
+ .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
+ .build();
+ ActionIdentifiers actionIdentifiers =
+ ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
+ .setBlueprintVersion(executionObject.getBlueprintVersion())
+ .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
+
+ Builder struct = Struct.newBuilder();
+ try {
+ JsonFormat.parser().merge(payload, struct);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
+ executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
+ executionObject.getActionName(), e);
+ }
+
+ return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
+ .setPayload(struct.build()).build();
+ }
+
+ private class ResponseHandler implements CDSProcessingListener {
+
+ private CDSResponse cdsResponse;
+
+ ResponseHandler(CDSResponse cdsResponse) {
+ this.cdsResponse = cdsResponse;
+ }
+
+ /**
+ * Get Response from CDS Client
+ */
+ @Override
+ public void onMessage(ExecutionServiceOutput message) {
+ logger.info("Received notification from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+
+ switch (eventType) {
+ case EVENT_COMPONENT_PROCESSING:
+ cdsResponse.status = PROCESSING;
+ break;
+ case EVENT_COMPONENT_EXECUTED:
+ cdsResponse.status = SUCCESS;
+ break;
+ default:
+ cdsResponse.status = FAILED;
+ cdsResponse.errorMessage = message.getStatus().getErrorMessage();
+ break;
+ }
+ cdsResponse.payload = message.getPayload();
+ }
+
+ /**
+ * On error at CDS, log the error
+ */
+ @Override
+ public void onError(Throwable t) {
+ Status status = Status.fromThrowable(t);
+ logger.error("Failed processing blueprint {}", status, t);
+ cdsResponse.status = EXCEPTION;
+ }
+ }
+
+ private class MessageSendingHandler implements CDSProcessingListener {
+
+ private MessageCorrelationBuilder messageCorrelationBuilder;
+ private AutoCloseable client;
+ private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
+
+ MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
+ this.messageCorrelationBuilder = messageCorrelationBuilder;
+ }
+
+ public void setClient(AutoCloseable client) {
+ this.client = client;
+ }
+
+ @Override
+ public void onMessage(ExecutionServiceOutput message) {
+ logger.info("Received payload from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+
+ if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
+ return;
+ }
+
+ String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
+ messageCorrelationBuilder.setVariable(CDS_STATUS, status);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
+
+ if (message.hasPayload()) {
+ try {
+ String payload = JsonFormat.printer().print(message.getPayload());
+ messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Failed parsing cds response", e);
+ }
+ }
+ correlate();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("Failed sending CDS request", t);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
+ messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
+ correlate();
+ }
+
+ /**
+ * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
+ * retry logic will allow camunda some time to finish transitioning the process.
+ */
+ private void correlate() {
+ try {
+ int remainingTries = 10;
+ while (!tryCorrelateMessage() && remainingTries > 0) {
+ logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
+ remainingTries--;
+ Thread.sleep(1000L);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Thread interrupted during message correlation", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ closeClient();
+ }
+ }
+
+ private boolean tryCorrelateMessage() {
+ try {
+ messageCorrelationBuilder.correlate();
+ logger.info("Message correlation successful");
+ return true;
+ } catch (MismatchingMessageCorrelationException e) {
+ return false;
+ }
+ }
+
+ private void closeClient() {
+ if (client == null)
+ throw new IllegalStateException("Client was not set and could not be closed");
+ try {
+ client.close();
+ } catch (Exception e) {
+ logger.error("Failed closing cds client", e);
+ }
+ }
}
+ private class CDSResponse {
+
+ String status;
+ String errorMessage;
+ Struct payload;
+
+ @Override
+ public String toString() {
+ return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="
+ + payload + '}';
+ }
+ }
}