package org.onap.policy.controlloop.actorserviceprovider.impl;
-import java.time.Instant;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Function;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Operator;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.Policy;
import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Partial implementation of an operator. Subclasses can choose to simply implement
- * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
- * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ * Partial implementation of an operator. In general, it's preferable that subclasses
+ * would override
+ * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
+ * startOperationAsync()}. However, if that proves to be too difficult, then they can
+ * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
+ * doOperation()}. In addition, if the operation requires any preprocessor steps, the
+ * subclass may choose to override
+ * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
+ * <p/>
+ * The futures returned by the methods within this class can be canceled, and will
+ * propagate the cancellation to any subtasks. Thus it is also expected that any futures
+ * returned by overridden methods will do the same. Of course, if a class overrides
+ * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
+ * then there's little that can be done to cancel that particular operation.
*/
public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
- private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
- private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
- private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+ /**
+ * Executor to be used for tasks that may perform blocking I/O. The default executor
+ * simply launches a new thread for each command that is submitted to it.
+ * <p/>
+ * May be overridden by junit tests.
+ */
+ @Getter(AccessLevel.PROTECTED)
+ @Setter(AccessLevel.PROTECTED)
+ private Executor blockingExecutor = command -> {
+ Thread thread = new Thread(command);
+ thread.setDaemon(true);
+ thread.start();
+ };
@Getter
private final String actorName;
}
@Override
- public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+ public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
if (!isAlive()) {
throw new IllegalStateException("operation is not running: " + getFullName());
}
- final Executor executor = params.getExecutor();
-
// allocate a controller for the entire operation
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+ CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
if (preproc == null) {
// no preprocessor required - just start the operation
return startOperationAttempt(params, controller, 1);
}
- // propagate "stop" to the preprocessor
- controller.add(preproc);
-
/*
* Do preprocessor first and then, if successful, start the operation. Note:
* operations create their own outcome, ignoring the outcome from any previous
* steps.
+ *
+ * Wrap the preprocessor to ensure "stop" is propagated to it.
*/
- preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
- .thenComposeAsync(handleFailure(params, controller), executor)
- .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
- executor);
-
- return controller;
- }
-
- /**
- * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
- * invokes the started and completed call-backs.
- *
- * @param params operation parameters
- * @return a future that will return the preprocessor outcome, or {@code null} if this
- * operation needs no preprocessor
- */
- protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
- logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
-
- final Executor executor = params.getExecutor();
- final ControlLoopOperation operation = params.makeOutcome();
-
- final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
- doPreprocessorAsFuture(params);
- if (preproc == null) {
- // no preprocessor required
- return null;
- }
-
- // allocate a controller for the preprocessor steps
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
-
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- firstFuture
- .thenComposeAsync(controller.add(preproc), executor)
- .exceptionally(fromException(params, operation))
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(preproc)
+ .exceptionally(fromException(params, "preprocessor of operation"))
+ .thenCompose(handlePreprocessorFailure(params, controller))
+ .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
// @formatter:on
- // start the pipeline
- firstFuture.complete(operation);
-
return controller;
}
/**
* Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
- * outcome that it received.
+ * invokes the call-backs, marks the controller complete, and returns an incomplete
+ * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
+ * received.
+ * <p/>
+ * Assumes that no callbacks have been invoked yet.
*
* @param params operation parameters
* @param controller pipeline controller
* @return a function that checks the outcome status and continues, if successful, or
* indicates a failure otherwise
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
return outcome -> {
// propagate "stop" to the callbacks
controller.add(callbacks);
- final ControlLoopOperation outcome2 = params.makeOutcome();
+ final OperationOutcome outcome2 = params.makeOutcome();
// TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
- outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+ outcome2.setResult(PolicyResult.FAILURE_GUARD);
outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
- CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
- .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+ // @formatter:off
+ CompletableFuture.completedFuture(outcome2)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
- return controller;
+ return new CompletableFuture<>();
};
}
* @return a function that will start the preprocessor and returns its outcome, or
* {@code null} if this operation needs no preprocessor
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
- ControlLoopOperationParams params) {
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
return null;
}
* @param attempt attempt number, typically starting with 1
* @return a future that will return the final result of all attempts
*/
- private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
- PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
-
- final Executor executor = params.getExecutor();
-
- CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+ private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller, int attempt) {
// propagate "stop" to the operation attempt
- controller.add(future);
-
- // detach when complete
- future.whenCompleteAsync(controller.delayedRemove(future), executor)
- .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
- .whenCompleteAsync(controller.delayedComplete(), executor);
+ controller.wrap(startAttemptWithoutRetries(params, attempt))
+ .thenCompose(retryOnFailure(params, controller, attempt));
return controller;
}
* @param attempt attempt number, typically starting with 1
* @return a future that will return the result of a single operation attempt
*/
- private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+ private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
int attempt) {
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
final Executor executor = params.getExecutor();
- final ControlLoopOperation outcome = params.makeOutcome();
+ final OperationOutcome outcome = params.makeOutcome();
final CallbackManager callbacks = new CallbackManager();
// this operation attempt gets its own controller
- final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
// propagate "stop" to the callbacks
controller.add(callbacks);
- /*
- * Don't mark it complete until we've built the whole pipeline. This will prevent
- * the operation from starting until after it has been successfully built (i.e.,
- * without generating any exceptions).
- */
- final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
// @formatter:off
- CompletableFuture<ControlLoopOperation> future2 =
- firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+ CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
// @formatter:on
// handle timeouts, if specified
- long timeoutMillis = getTimeOutMillis(params.getPolicy());
+ long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
if (timeoutMillis > 0) {
logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
- future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
}
/*
*/
// @formatter:off
- future2.exceptionally(fromException(params, outcome))
- .thenApplyAsync(setRetryFlag(params, attempt), executor)
- .thenApplyAsync(callbackStarted(params, callbacks), executor)
- .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+ future.exceptionally(fromException(params, "operation"))
+ .thenApply(setRetryFlag(params, attempt))
+ .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+ .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
.whenCompleteAsync(controller.delayedComplete(), executor);
// @formatter:on
- // start the pipeline
- firstFuture.complete(outcome);
-
return controller;
}
* @param outcome outcome to examine
* @return {@code true} if the outcome was successful
*/
- protected boolean isSuccess(ControlLoopOperation outcome) {
- return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+ protected boolean isSuccess(OperationOutcome outcome) {
+ return (outcome.getResult() == PolicyResult.SUCCESS);
}
/**
* @return {@code true} if the outcome is not {@code null} and was a failure
* <i>and</i> was associated with this operator, {@code false} otherwise
*/
- protected boolean isActorFailed(ControlLoopOperation outcome) {
- return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+ protected boolean isActorFailed(OperationOutcome outcome) {
+ return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ }
+
+ /**
+ * Determines if the given outcome is for this operation.
+ *
+ * @param outcome outcome to examine
+ * @return {@code true} if the outcome is for this operation, {@code false} otherwise
+ */
+ protected boolean isSameOperation(OperationOutcome outcome) {
+ return OperationOutcome.isFor(outcome, getActorName(), getName());
}
/**
* Invokes the operation as a "future". This method simply invokes
- * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+ * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
+ * "blocking executor"}, returning the result via a "future".
+ * <p/>
+ * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
+ * the executor in the "params", as that may bring the background thread pool to a
+ * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
+ * instead.
* <p/>
* This method assumes the following:
* <ul>
* @return a function that will start the operation and return its result when
* complete
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
- ControlLoopOperationParams params, int attempt) {
-
- /*
- * TODO As doOperation() may perform blocking I/O, this should be launched in its
- * own thread to prevent the ForkJoinPool from being tied up. Should probably
- * provide a method to make that easy.
- */
+ protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
+ OperationOutcome outcome) {
- return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
- params.getExecutor());
+ return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
}
/**
* Low-level method that performs the operation. This can make the same assumptions
* that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
- * method throws an {@link UnsupportedOperationException}.
+ * particular method simply throws an {@link UnsupportedOperationException}.
*
* @param params operation parameters
* @param attempt attempt number, typically starting with 1
* @param operation the operation being performed
* @return the outcome of the operation
*/
- protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
- ControlLoopOperation operation) {
+ protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
throw new UnsupportedOperationException("start operation " + getFullName());
}
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
- int attempt) {
+ private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
return operation -> {
if (operation != null && !isActorFailed(operation)) {
}
// get a non-null operation
- ControlLoopOperation oper2;
+ OperationOutcome oper2;
if (operation != null) {
oper2 = operation;
} else {
oper2 = params.makeOutcome();
- oper2.setOutcome(OUTCOME_FAILURE);
+ oper2.setResult(PolicyResult.FAILURE);
}
- if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
- && attempt > params.getPolicy().getRetry()) {
+ Integer retry = params.getRetry();
+ if (retry != null && retry > 0 && attempt > retry) {
/*
* retries were specified and we've already tried them all - change to
* FAILURE_RETRIES
*/
logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- oper2.setOutcome(OUTCOME_RETRIES);
+ oper2.setResult(PolicyResult.FAILURE_RETRIES);
}
return oper2;
* @param attempt latest attempt number, starting with 1
* @return a function to get the next future to execute
*/
- private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
- ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
+ ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
int attempt) {
return operation -> {
if (!isActorFailed(operation)) {
// wrong type or wrong operation - just leave it as is
logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
- if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+ Integer retry = params.getRetry();
+ if (retry == null || retry <= 0) {
// no retries - already marked as FAILURE, so just return it
logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ controller.complete(operation);
+ return new CompletableFuture<>();
}
}
/**
- * Gets the outcome of an operation for this operation.
+ * Converts an exception into an operation outcome, returning a copy of the outcome to
+ * prevent background jobs from changing it.
*
- * @param operation operation whose outcome is to be extracted
- * @return the outcome of the given operation, if it's for this operator, {@code null}
- * otherwise
+ * @param params operation parameters
+ * @param type type of item throwing the exception
+ * @return a function that will convert an exception into an operation outcome
*/
- protected String getActorOutcome(ControlLoopOperation operation) {
- if (operation == null) {
- return null;
- }
+ private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
- if (!getActorName().equals(operation.getActor())) {
- return null;
- }
+ return thrown -> {
+ OperationOutcome outcome = params.makeOutcome();
+
+ logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+
+ return setOutcome(params, outcome, thrown);
+ };
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
+
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
+ * outstanding futures when one completes.
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final Executor executor = params.getExecutor();
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ // @formatter:off
+ CompletableFuture.anyOf(futures)
+ .thenApply(object -> (OperationOutcome) object)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
+ }
+
+ /**
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
+ * the futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ List<CompletableFuture<OperationOutcome>> futures) {
- if (!getName().equals(operation.getOperation())) {
- return null;
+ // convert list to an array
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
+ return result;
+ }
+
+ /**
+ * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
+ * futures if returned future is canceled. The future returns the "worst" outcome,
+ * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ *
+ * @param params operation parameters
+ * @param futures futures for which to wait
+ * @return a future to cancel or await an outcome. If this future is canceled, then
+ * all of the futures will be canceled
+ */
+ protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ attachFutures(controller, futures);
+
+ OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+
+ @SuppressWarnings("rawtypes")
+ CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+
+ // record the outcomes of each future when it completes
+ for (int count = 0; count < futures2.length; ++count) {
+ final int count2 = count;
+ futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
}
- return operation.getOutcome();
+ CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
+
+ return controller;
}
/**
- * Gets a function that will start the next step, if the current operation was
- * successful, or just return the current operation, otherwise.
+ * Attaches the given futures to the controller.
+ *
+ * @param controller master controller for all of the futures
+ * @param futures futures to be attached to the controller
+ */
+ private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+ // attach each task
+ for (CompletableFuture<OperationOutcome> future : futures) {
+ controller.add(future);
+ }
+ }
+
+ /**
+ * Combines the outcomes from a set of tasks.
*
* @param params operation parameters
- * @param nextStep function that will invoke the next step, passing it the operation
- * @return a function that will start the next step
+ * @param future future to be completed with the combined result
+ * @param outcomes outcomes to be examined
*/
- protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
- ControlLoopOperationParams params,
- Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+ private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
+ CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
- return operation -> {
+ return (unused, thrown) -> {
+ if (thrown != null) {
+ future.completeExceptionally(thrown);
+ return;
+ }
- if (operation == null) {
- logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
- ControlLoopOperation outcome = params.makeOutcome();
- outcome.setOutcome(OUTCOME_FAILURE);
- return CompletableFuture.completedFuture(outcome);
+ // identify the outcome with the highest priority
+ OperationOutcome outcome = outcomes[0];
+ int priority = detmPriority(outcome);
- } else if (isSuccess(operation)) {
- logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
- return nextStep.apply(operation);
+ // start with "1", as we've already dealt with "0"
+ for (int count = 1; count < outcomes.length; ++count) {
+ OperationOutcome outcome2 = outcomes[count];
+ int priority2 = detmPriority(outcome2);
- } else {
- logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(operation);
+ if (priority2 > priority) {
+ outcome = outcome2;
+ priority = priority2;
+ }
}
+
+ logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
+ (outcome == null ? null : outcome.getResult()), params.getRequestId());
+
+ future.complete(outcome);
};
}
/**
- * Converts an exception into an operation outcome, returning a copy of the outcome to
- * prevent background jobs from changing it.
+ * Determines the priority of an outcome based on its result.
*
- * @param params operation parameters
- * @param operation current operation
- * @return a function that will convert an exception into an operation outcome
+ * @param outcome outcome to examine, or {@code null}
+ * @return the outcome's priority
*/
- private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
- ControlLoopOperation operation) {
+ protected int detmPriority(OperationOutcome outcome) {
+ if (outcome == null) {
+ return 1;
+ }
- return thrown -> {
- logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
- params.getRequestId(), thrown);
+ switch (outcome.getResult()) {
+ case SUCCESS:
+ return 0;
+
+ case FAILURE_GUARD:
+ return 2;
+
+ case FAILURE_RETRIES:
+ return 3;
+
+ case FAILURE:
+ return 4;
+
+ case FAILURE_TIMEOUT:
+ return 5;
+
+ case FAILURE_EXCEPTION:
+ default:
+ return 6;
+ }
+ }
+
+ /**
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
+ *
+ * @param params operation parameters
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param outcome outcome of the previous task
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
+ */
+ // @formatter:off
+ protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess, OperationOutcome outcome,
+ CompletableFuture<OperationOutcome> task) {
+ // @formatter:on
+ if (checkSuccess && !isSuccess(outcome)) {
/*
- * Must make a copy of the operation, as the original could be changed by
- * background jobs that might still be running.
+ * must complete before canceling so that cancel() doesn't cause controller to
+ * complete
*/
- return setOutcome(params, new ControlLoopOperation(operation), thrown);
- };
+ controller.complete(outcome);
+ task.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ return controller.wrap(task);
}
/**
- * Gets a function to verify that the operation is still running. If the pipeline is
- * not running, then it returns an incomplete future, which will effectively halt
- * subsequent operations in the pipeline. This method is intended to be used with one
- * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+ * Performs a task, after verifying that the controller is still running. Also checks
+ * that the previous outcome was successful, if specified.
*
- * @param controller pipeline controller
* @param params operation parameters
- * @return a function to verify that the operation is still running
+ * @param controller overall pipeline controller
+ * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+ * otherwise
+ * @param tasks tasks to be performed
+ * @return a function to perform the task. If everything checks out, then it returns
+ * the task's future. Otherwise, it returns an incomplete future and completes
+ * the controller instead.
*/
- protected <T> Function<T, CompletableFuture<T>> verifyRunning(
- PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+ // @formatter:off
+ protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
+ PipelineControllerFuture<OperationOutcome> controller,
+ boolean checkSuccess,
+ Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
+ // @formatter:on
+
+ return outcome -> {
+
+ if (!controller.isRunning()) {
+ return new CompletableFuture<>();
+ }
- return value -> {
- boolean running = controller.isRunning();
- logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+ if (checkSuccess && !isSuccess(outcome)) {
+ controller.complete(outcome);
+ return new CompletableFuture<>();
+ }
- return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+ return controller.wrap(task.apply(outcome));
};
}
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return outcome -> {
+ return (outcome, thrown) -> {
if (callbacks.canStart()) {
// haven't invoked "start" callback yet
outcome.setEnd(null);
params.callbackStarted(outcome);
}
-
- return outcome;
};
}
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+ private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
CallbackManager callbacks) {
- return operation -> {
+ return (outcome, thrown) -> {
if (callbacks.canEnd()) {
- operation.setStart(callbacks.getStartTime());
- operation.setEnd(callbacks.getEndTime());
- params.callbackCompleted(operation);
+ outcome.setStart(callbacks.getStartTime());
+ outcome.setEnd(callbacks.getEndTime());
+ params.callbackCompleted(outcome);
}
-
- return operation;
};
}
* @param operation operation to be updated
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
Throwable thrown) {
PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
return setOutcome(params, operation, result);
* @param result result of the operation
* @return the updated operation
*/
- protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+ protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
PolicyResult result) {
logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
- operation.setOutcome(result.toString());
+ operation.setResult(result);
operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
: ControlLoopOperation.FAILED_MSG);
* Gets the operation timeout. Subclasses may override this method to obtain the
* timeout in some other way (e.g., through configuration properties).
*
- * @param policy policy from which to extract the timeout
+ * @param timeoutSec timeout, in seconds, or {@code null}
* @return the operation timeout, in milliseconds
*/
- protected long getTimeOutMillis(Policy policy) {
- Integer timeoutSec = policy.getTimeout();
+ protected long getTimeOutMillis(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
-
- /**
- * Manager for "start" and "end" callbacks.
- */
- private static class CallbackManager implements Runnable {
- private final AtomicReference<Instant> startTime = new AtomicReference<>();
- private final AtomicReference<Instant> endTime = new AtomicReference<>();
-
- /**
- * Determines if the "start" callback can be invoked. If so, it sets the
- * {@link #startTime} to the current time.
- *
- * @return {@code true} if the "start" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canStart() {
- return startTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Determines if the "end" callback can be invoked. If so, it sets the
- * {@link #endTime} to the current time.
- *
- * @return {@code true} if the "end" callback can be invoked, {@code false}
- * otherwise
- */
- public boolean canEnd() {
- return endTime.compareAndSet(null, Instant.now());
- }
-
- /**
- * Gets the start time.
- *
- * @return the start time, or {@code null} if {@link #canStart()} has not been
- * invoked yet.
- */
- public Instant getStartTime() {
- return startTime.get();
- }
-
- /**
- * Gets the end time.
- *
- * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
- * yet.
- */
- public Instant getEndTime() {
- return endTime.get();
- }
-
- /**
- * Prevents further callbacks from being executed by setting {@link #startTime}
- * and {@link #endTime}.
- */
- @Override
- public void run() {
- canStart();
- canEnd();
- }
- }
}