import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import lombok.Getter;
-import org.apache.commons.lang3.tuple.Triple;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
-import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Operation that uses a Topic pair.
+ * Operation that uses a bidirectional topic.
*
* @param <S> response type
*/
@Getter
-public abstract class TopicPairOperation<Q, S> extends OperationPartial {
- private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
- private static final Coder coder = new StandardCoder();
+public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
+ private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
+
+ /**
+ * Response status.
+ */
+ public enum Status {
+ SUCCESS, FAILURE, STILL_WAITING
+ }
// fields extracted from the operator
- private final TopicPair topicPair;
+ private final BidirectionalTopicHandler topicHandler;
private final Forwarder forwarder;
- private final TopicPairParams pairParams;
+ private final BidirectionalTopicParams topicParams;
private final long timeoutMs;
/**
* @param operator operator that created this operation
* @param clazz response class
*/
- public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
+ public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
+ Class<S> clazz) {
super(params, operator);
- this.topicPair = operator.getTopicPair();
+ this.topicHandler = operator.getTopicHandler();
this.forwarder = operator.getForwarder();
- this.pairParams = operator.getParams();
+ this.topicParams = operator.getParams();
this.responseClass = clazz;
- this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
+ this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
}
/**
final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
- new CompletableFuture<>();
final Executor executor = params.getExecutor();
// register a listener BEFORE publishing
- // @formatter:off
- TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
- (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
- // @formatter:on
-
- // TODO this currently only allows a single matching response
+ BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
+ OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+ if (latestOutcome != null) {
+ // final response - complete the controller
+ controller.completeAsync(() -> latestOutcome, executor);
+ }
+ };
forwarder.register(expectedKeyValues, listener);
throw e;
}
-
- // once "future" completes, process the response, and then complete the controller
-
- // @formatter:off
- future.thenApplyAsync(
- triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
- executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
-
return controller;
}
throw new IllegalArgumentException("cannot encode request", e);
}
- List<CommInfrastructure> list = topicPair.publish(json);
- if (list.isEmpty()) {
+ if (!topicHandler.send(json)) {
throw new IllegalStateException("nothing published");
}
- logTopicRequest(list, request);
+ logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
}
/**
* @param outcome outcome to be populated
* @param response raw response to process
* @param scoResponse response, as a {@link StandardCoderObject}
- * @return the outcome
+ * @return the outcome, or {@code null} if still waiting for completion
*/
- protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
+ protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
StandardCoderObject scoResponse) {
logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
- logTopicResponse(infra, rawResponse);
+ logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
+ rawResponse);
+ // decode the response
S response;
if (responseClass == String.class) {
response = responseClass.cast(rawResponse);
}
}
- if (!isSuccess(rawResponse, response)) {
- logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
- params.getRequestId());
- return setOutcome(outcome, PolicyResult.FAILURE);
- }
+ // check its status
+ switch (detmStatus(rawResponse, response)) {
+ case SUCCESS:
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ setOutcome(outcome, PolicyResult.SUCCESS);
+ postProcessResponse(outcome, rawResponse, response);
+ return outcome;
- logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS);
- postProcessResponse(outcome, rawResponse, response);
+ case FAILURE:
+ logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return setOutcome(outcome, PolicyResult.FAILURE);
- return outcome;
+ case STILL_WAITING:
+ default:
+ logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return null;
+ }
}
/**
}
/**
- * Determines if the response indicates success.
+ * Determines the status of the response.
*
* @param rawResponse raw response
* @param response decoded response
- * @return {@code true} if the response indicates success, {@code false} otherwise
- */
- protected abstract boolean isSuccess(String rawResponse, S response);
-
- /**
- * Logs a TOPIC request. If the request is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param infrastructures list of communication infrastructures on which it was
- * published
- * @param request request to be logged
- */
- protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
- if (infrastructures.isEmpty()) {
- return;
- }
-
- String json;
- try {
- if (request == null) {
- json = null;
- } else if (request instanceof String) {
- json = request.toString();
- } else {
- json = makeCoder().encode(request, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print request", e);
- json = request.toString();
- }
-
- for (CommInfrastructure infra : infrastructures) {
- logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
- }
- }
-
- /**
- * Logs a TOPIC response. If the response is not of type, String, then it attempts to
- * pretty-print it into JSON before logging.
- *
- * @param infra communication infrastructure on which the response was received
- * @param response response to be logged
+ * @return the status of the response
*/
- protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
- String json;
- try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = makeCoder().encode(response, true);
- }
-
- } catch (CoderException e) {
- logger.warn("cannot pretty-print response", e);
- json = response.toString();
- }
-
- logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
- }
-
- // these may be overridden by junit tests
-
- protected Coder makeCoder() {
- return coder;
- }
+ protected abstract Status detmStatus(String rawResponse, S response);
}