Clean up and enhancement of Actor re-design
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperatorPartial.java
index 80d8fbd..df5258d 100644 (file)
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
-import java.time.Instant;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
+import lombok.AccessLevel;
 import lombok.Getter;
+import lombok.Setter;
 import org.onap.policy.controlloop.ControlLoopOperation;
+import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.Policy;
 import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Partial implementation of an operator. Subclasses can choose to simply implement
- * {@link #doOperation(ControlLoopOperationParams)}, or they may choose to override
- * {@link #doOperationAsFuture(ControlLoopOperationParams)}.
+ * Partial implementation of an operator. In general, it's preferable that subclasses
+ * would override
+ * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
+ * startOperationAsync()}. However, if that proves to be too difficult, then they can
+ * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
+ * doOperation()}. In addition, if the operation requires any preprocessor steps, the
+ * subclass may choose to override
+ * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
+ * <p/>
+ * The futures returned by the methods within this class can be canceled, and will
+ * propagate the cancellation to any subtasks. Thus it is also expected that any futures
+ * returned by overridden methods will do the same. Of course, if a class overrides
+ * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
+ * then there's little that can be done to cancel that particular operation.
  */
 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
 
     private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
 
-    private static final String OUTCOME_SUCCESS = PolicyResult.SUCCESS.toString();
-    private static final String OUTCOME_FAILURE = PolicyResult.FAILURE.toString();
-    private static final String OUTCOME_RETRIES = PolicyResult.FAILURE_RETRIES.toString();
+    /**
+     * Executor to be used for tasks that may perform blocking I/O. The default executor
+     * simply launches a new thread for each command that is submitted to it.
+     * <p/>
+     * May be overridden by junit tests.
+     */
+    @Getter(AccessLevel.PROTECTED)
+    @Setter(AccessLevel.PROTECTED)
+    private Executor blockingExecutor = command -> {
+        Thread thread = new Thread(command);
+        thread.setDaemon(true);
+        thread.start();
+    };
 
     @Getter
     private final String actorName;
@@ -103,94 +127,52 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
     }
 
     @Override
-    public final CompletableFuture<ControlLoopOperation> startOperation(ControlLoopOperationParams params) {
+    public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
         if (!isAlive()) {
             throw new IllegalStateException("operation is not running: " + getFullName());
         }
 
-        final Executor executor = params.getExecutor();
-
         // allocate a controller for the entire operation
-        final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
-        CompletableFuture<ControlLoopOperation> preproc = startPreprocessor(params);
+        CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
         if (preproc == null) {
             // no preprocessor required - just start the operation
             return startOperationAttempt(params, controller, 1);
         }
 
-        // propagate "stop" to the preprocessor
-        controller.add(preproc);
-
         /*
          * Do preprocessor first and then, if successful, start the operation. Note:
          * operations create their own outcome, ignoring the outcome from any previous
          * steps.
+         *
+         * Wrap the preprocessor to ensure "stop" is propagated to it.
          */
-        preproc.whenCompleteAsync(controller.delayedRemove(preproc), executor)
-                        .thenComposeAsync(handleFailure(params, controller), executor)
-                        .thenComposeAsync(onSuccess(params, unused -> startOperationAttempt(params, controller, 1)),
-                                        executor);
-
-        return controller;
-    }
-
-    /**
-     * Starts an operation's preprocessor step(s). If the preprocessor fails, then it
-     * invokes the started and completed call-backs.
-     *
-     * @param params operation parameters
-     * @return a future that will return the preprocessor outcome, or {@code null} if this
-     *         operation needs no preprocessor
-     */
-    protected CompletableFuture<ControlLoopOperation> startPreprocessor(ControlLoopOperationParams params) {
-        logger.info("{}: start low-level operation preprocessor for {}", getFullName(), params.getRequestId());
-
-        final Executor executor = params.getExecutor();
-        final ControlLoopOperation operation = params.makeOutcome();
-
-        final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
-                        doPreprocessorAsFuture(params);
-        if (preproc == null) {
-            // no preprocessor required
-            return null;
-        }
-
-        // allocate a controller for the preprocessor steps
-        final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
-
-        /*
-         * Don't mark it complete until we've built the whole pipeline. This will prevent
-         * the operation from starting until after it has been successfully built (i.e.,
-         * without generating any exceptions).
-         */
-        final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
         // @formatter:off
-        firstFuture
-            .thenComposeAsync(controller.add(preproc), executor)
-            .exceptionally(fromException(params, operation))
-            .whenCompleteAsync(controller.delayedComplete(), executor);
+        controller.wrap(preproc)
+                        .exceptionally(fromException(params, "preprocessor of operation"))
+                        .thenCompose(handlePreprocessorFailure(params, controller))
+                        .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
         // @formatter:on
 
-        // start the pipeline
-        firstFuture.complete(operation);
-
         return controller;
     }
 
     /**
      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
-     * invokes the call-backs and returns a failed outcome. Otherwise, it returns the
-     * outcome that it received.
+     * invokes the call-backs, marks the controller complete, and returns an incomplete
+     * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
+     * received.
+     * <p/>
+     * Assumes that no callbacks have been invoked yet.
      *
      * @param params operation parameters
      * @param controller pipeline controller
      * @return a function that checks the outcome status and continues, if successful, or
      *         indicates a failure otherwise
      */
-    private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> handleFailure(
-                    ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller) {
+    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
+                    ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
 
         return outcome -> {
 
@@ -207,19 +189,21 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
             // propagate "stop" to the callbacks
             controller.add(callbacks);
 
-            final ControlLoopOperation outcome2 = params.makeOutcome();
+            final OperationOutcome outcome2 = params.makeOutcome();
 
             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
 
-            outcome2.setOutcome(PolicyResult.FAILURE_GUARD.toString());
+            outcome2.setResult(PolicyResult.FAILURE_GUARD);
             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
 
-            CompletableFuture.completedFuture(outcome2).thenApplyAsync(callbackStarted(params, callbacks), executor)
-                            .thenApplyAsync(callbackCompleted(params, callbacks), executor)
-                            .whenCompleteAsync(controller.delayedRemove(callbacks), executor)
+            // @formatter:off
+            CompletableFuture.completedFuture(outcome2)
+                            .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+                            .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
                             .whenCompleteAsync(controller.delayedComplete(), executor);
+            // @formatter:on
 
-            return controller;
+            return new CompletableFuture<>();
         };
     }
 
@@ -237,8 +221,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @return a function that will start the preprocessor and returns its outcome, or
      *         {@code null} if this operation needs no preprocessor
      */
-    protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
-                    ControlLoopOperationParams params) {
+    protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
         return null;
     }
 
@@ -251,20 +234,12 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param attempt attempt number, typically starting with 1
      * @return a future that will return the final result of all attempts
      */
-    private CompletableFuture<ControlLoopOperation> startOperationAttempt(ControlLoopOperationParams params,
-                    PipelineControllerFuture<ControlLoopOperation> controller, int attempt) {
-
-        final Executor executor = params.getExecutor();
-
-        CompletableFuture<ControlLoopOperation> future = startAttemptWithoutRetries(params, attempt);
+    private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
+                    PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
         // propagate "stop" to the operation attempt
-        controller.add(future);
-
-        // detach when complete
-        future.whenCompleteAsync(controller.delayedRemove(future), executor)
-                        .thenComposeAsync(retryOnFailure(params, controller, attempt), params.getExecutor())
-                        .whenCompleteAsync(controller.delayedComplete(), executor);
+        controller.wrap(startAttemptWithoutRetries(params, attempt))
+                        .thenCompose(retryOnFailure(params, controller, attempt));
 
         return controller;
     }
@@ -276,40 +251,32 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param attempt attempt number, typically starting with 1
      * @return a future that will return the result of a single operation attempt
      */
-    private CompletableFuture<ControlLoopOperation> startAttemptWithoutRetries(ControlLoopOperationParams params,
+    private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
                     int attempt) {
 
         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
         final Executor executor = params.getExecutor();
-        final ControlLoopOperation outcome = params.makeOutcome();
+        final OperationOutcome outcome = params.makeOutcome();
         final CallbackManager callbacks = new CallbackManager();
 
         // this operation attempt gets its own controller
-        final PipelineControllerFuture<ControlLoopOperation> controller = new PipelineControllerFuture<>();
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
         // propagate "stop" to the callbacks
         controller.add(callbacks);
 
-        /*
-         * Don't mark it complete until we've built the whole pipeline. This will prevent
-         * the operation from starting until after it has been successfully built (i.e.,
-         * without generating any exceptions).
-         */
-        final CompletableFuture<ControlLoopOperation> firstFuture = new CompletableFuture<>();
-
         // @formatter:off
-        CompletableFuture<ControlLoopOperation> future2 =
-            firstFuture.thenComposeAsync(verifyRunning(controller, params), executor)
-                        .thenApplyAsync(callbackStarted(params, callbacks), executor)
-                        .thenComposeAsync(controller.add(doOperationAsFuture(params, attempt)), executor);
+        CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
+                        .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+                        .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
         // @formatter:on
 
         // handle timeouts, if specified
-        long timeoutMillis = getTimeOutMillis(params.getPolicy());
+        long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
         if (timeoutMillis > 0) {
             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
-            future2 = future2.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+            future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
         }
 
         /*
@@ -321,16 +288,13 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
          */
 
         // @formatter:off
-        future2.exceptionally(fromException(params, outcome))
-                    .thenApplyAsync(setRetryFlag(params, attempt), executor)
-                    .thenApplyAsync(callbackStarted(params, callbacks), executor)
-                    .thenApplyAsync(callbackCompleted(params, callbacks), executor)
+        future.exceptionally(fromException(params, "operation"))
+                    .thenApply(setRetryFlag(params, attempt))
+                    .whenCompleteAsync(callbackStarted(params, callbacks), executor)
+                    .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
                     .whenCompleteAsync(controller.delayedComplete(), executor);
         // @formatter:on
 
-        // start the pipeline
-        firstFuture.complete(outcome);
-
         return controller;
     }
 
@@ -340,8 +304,8 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param outcome outcome to examine
      * @return {@code true} if the outcome was successful
      */
-    protected boolean isSuccess(ControlLoopOperation outcome) {
-        return OUTCOME_SUCCESS.equals(outcome.getOutcome());
+    protected boolean isSuccess(OperationOutcome outcome) {
+        return (outcome.getResult() == PolicyResult.SUCCESS);
     }
 
     /**
@@ -351,13 +315,29 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @return {@code true} if the outcome is not {@code null} and was a failure
      *         <i>and</i> was associated with this operator, {@code false} otherwise
      */
-    protected boolean isActorFailed(ControlLoopOperation outcome) {
-        return OUTCOME_FAILURE.equals(getActorOutcome(outcome));
+    protected boolean isActorFailed(OperationOutcome outcome) {
+        return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+    }
+
+    /**
+     * Determines if the given outcome is for this operation.
+     *
+     * @param outcome outcome to examine
+     * @return {@code true} if the outcome is for this operation, {@code false} otherwise
+     */
+    protected boolean isSameOperation(OperationOutcome outcome) {
+        return OperationOutcome.isFor(outcome, getActorName(), getName());
     }
 
     /**
      * Invokes the operation as a "future". This method simply invokes
-     * {@link #doOperation(ControlLoopOperationParams)} turning it into a "future".
+     * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
+     * "blocking executor"}, returning the result via a "future".
+     * <p/>
+     * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
+     * the executor in the "params", as that may bring the background thread pool to a
+     * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
+     * instead.
      * <p/>
      * This method assumes the following:
      * <ul>
@@ -373,31 +353,23 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @return a function that will start the operation and return its result when
      *         complete
      */
-    protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
-                    ControlLoopOperationParams params, int attempt) {
-
-        /*
-         * TODO As doOperation() may perform blocking I/O, this should be launched in its
-         * own thread to prevent the ForkJoinPool from being tied up. Should probably
-         * provide a method to make that easy.
-         */
+    protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
+                    OperationOutcome outcome) {
 
-        return operation -> CompletableFuture.supplyAsync(() -> doOperation(params, attempt, operation),
-                        params.getExecutor());
+        return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
     }
 
     /**
      * Low-level method that performs the operation. This can make the same assumptions
      * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
-     * method throws an {@link UnsupportedOperationException}.
+     * particular method simply throws an {@link UnsupportedOperationException}.
      *
      * @param params operation parameters
      * @param attempt attempt number, typically starting with 1
      * @param operation the operation being performed
      * @return the outcome of the operation
      */
-    protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
-                    ControlLoopOperation operation) {
+    protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
 
         throw new UnsupportedOperationException("start operation " + getFullName());
     }
@@ -411,8 +383,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param attempt latest attempt number, starting with 1
      * @return a function to get the next future to execute
      */
-    private Function<ControlLoopOperation, ControlLoopOperation> setRetryFlag(ControlLoopOperationParams params,
-                    int attempt) {
+    private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
 
         return operation -> {
             if (operation != null && !isActorFailed(operation)) {
@@ -424,22 +395,22 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
             }
 
             // get a non-null operation
-            ControlLoopOperation oper2;
+            OperationOutcome oper2;
             if (operation != null) {
                 oper2 = operation;
             } else {
                 oper2 = params.makeOutcome();
-                oper2.setOutcome(OUTCOME_FAILURE);
+                oper2.setResult(PolicyResult.FAILURE);
             }
 
-            if (params.getPolicy().getRetry() != null && params.getPolicy().getRetry() > 0
-                            && attempt > params.getPolicy().getRetry()) {
+            Integer retry = params.getRetry();
+            if (retry != null && retry > 0 && attempt > retry) {
                 /*
                  * retries were specified and we've already tried them all - change to
                  * FAILURE_RETRIES
                  */
                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
-                oper2.setOutcome(OUTCOME_RETRIES);
+                oper2.setResult(PolicyResult.FAILURE_RETRIES);
             }
 
             return oper2;
@@ -456,21 +427,24 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param attempt latest attempt number, starting with 1
      * @return a function to get the next future to execute
      */
-    private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> retryOnFailure(
-                    ControlLoopOperationParams params, PipelineControllerFuture<ControlLoopOperation> controller,
+    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
+                    ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
                     int attempt) {
 
         return operation -> {
             if (!isActorFailed(operation)) {
                 // wrong type or wrong operation - just leave it as is
                 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
-                return CompletableFuture.completedFuture(operation);
+                controller.complete(operation);
+                return new CompletableFuture<>();
             }
 
-            if (params.getPolicy().getRetry() == null || params.getPolicy().getRetry() <= 0) {
+            Integer retry = params.getRetry();
+            if (retry == null || retry <= 0) {
                 // no retries - already marked as FAILURE, so just return it
                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
-                return CompletableFuture.completedFuture(operation);
+                controller.complete(operation);
+                return new CompletableFuture<>();
             }
 
 
@@ -484,100 +458,279 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
     }
 
     /**
-     * Gets the outcome of an operation for this operation.
+     * Converts an exception into an operation outcome, returning a copy of the outcome to
+     * prevent background jobs from changing it.
      *
-     * @param operation operation whose outcome is to be extracted
-     * @return the outcome of the given operation, if it's for this operator, {@code null}
-     *         otherwise
+     * @param params operation parameters
+     * @param type type of item throwing the exception
+     * @return a function that will convert an exception into an operation outcome
      */
-    protected String getActorOutcome(ControlLoopOperation operation) {
-        if (operation == null) {
-            return null;
-        }
+    private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
 
-        if (!getActorName().equals(operation.getActor())) {
-            return null;
-        }
+        return thrown -> {
+            OperationOutcome outcome = params.makeOutcome();
+
+            logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+                            params.getRequestId(), thrown);
+
+            return setOutcome(params, outcome, thrown);
+        };
+    }
+
+    /**
+     * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+     * any outstanding futures when one completes.
+     *
+     * @param params operation parameters
+     * @param futures futures for which to wait
+     * @return a future to cancel or await an outcome. If this future is canceled, then
+     *         all of the futures will be canceled
+     */
+    protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+                    List<CompletableFuture<OperationOutcome>> futures) {
+
+        // convert list to an array
+        @SuppressWarnings("rawtypes")
+        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
+        return result;
+    }
+
+    /**
+     * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
+     * outstanding futures when one completes.
+     *
+     * @param params operation parameters
+     * @param futures futures for which to wait
+     * @return a future to cancel or await an outcome. If this future is canceled, then
+     *         all of the futures will be canceled
+     */
+    protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
+                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+        final Executor executor = params.getExecutor();
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        attachFutures(controller, futures);
+
+        // @formatter:off
+        CompletableFuture.anyOf(futures)
+                            .thenApply(object -> (OperationOutcome) object)
+                            .whenCompleteAsync(controller.delayedComplete(), executor);
+        // @formatter:on
+
+        return controller;
+    }
+
+    /**
+     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
+     * the futures if returned future is canceled. The future returns the "worst" outcome,
+     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     *
+     * @param params operation parameters
+     * @param futures futures for which to wait
+     * @return a future to cancel or await an outcome. If this future is canceled, then
+     *         all of the futures will be canceled
+     */
+    protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+                    List<CompletableFuture<OperationOutcome>> futures) {
 
-        if (!getName().equals(operation.getOperation())) {
-            return null;
+        // convert list to an array
+        @SuppressWarnings("rawtypes")
+        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
+        return result;
+    }
+
+    /**
+     * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
+     * futures if returned future is canceled. The future returns the "worst" outcome,
+     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     *
+     * @param params operation parameters
+     * @param futures futures for which to wait
+     * @return a future to cancel or await an outcome. If this future is canceled, then
+     *         all of the futures will be canceled
+     */
+    protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
+                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        attachFutures(controller, futures);
+
+        OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+
+        @SuppressWarnings("rawtypes")
+        CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+
+        // record the outcomes of each future when it completes
+        for (int count = 0; count < futures2.length; ++count) {
+            final int count2 = count;
+            futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
         }
 
-        return operation.getOutcome();
+        CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
+
+        return controller;
     }
 
     /**
-     * Gets a function that will start the next step, if the current operation was
-     * successful, or just return the current operation, otherwise.
+     * Attaches the given futures to the controller.
+     *
+     * @param controller master controller for all of the futures
+     * @param futures futures to be attached to the controller
+     */
+    private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+
+        // attach each task
+        for (CompletableFuture<OperationOutcome> future : futures) {
+            controller.add(future);
+        }
+    }
+
+    /**
+     * Combines the outcomes from a set of tasks.
      *
      * @param params operation parameters
-     * @param nextStep function that will invoke the next step, passing it the operation
-     * @return a function that will start the next step
+     * @param future future to be completed with the combined result
+     * @param outcomes outcomes to be examined
      */
-    protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> onSuccess(
-                    ControlLoopOperationParams params,
-                    Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep) {
+    private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
+                    CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
 
-        return operation -> {
+        return (unused, thrown) -> {
+            if (thrown != null) {
+                future.completeExceptionally(thrown);
+                return;
+            }
 
-            if (operation == null) {
-                logger.trace("{}: null outcome - discarding next task for {}", getFullName(), params.getRequestId());
-                ControlLoopOperation outcome = params.makeOutcome();
-                outcome.setOutcome(OUTCOME_FAILURE);
-                return CompletableFuture.completedFuture(outcome);
+            // identify the outcome with the highest priority
+            OperationOutcome outcome = outcomes[0];
+            int priority = detmPriority(outcome);
 
-            } else if (isSuccess(operation)) {
-                logger.trace("{}: success - starting next task for {}", getFullName(), params.getRequestId());
-                return nextStep.apply(operation);
+            // start with "1", as we've already dealt with "0"
+            for (int count = 1; count < outcomes.length; ++count) {
+                OperationOutcome outcome2 = outcomes[count];
+                int priority2 = detmPriority(outcome2);
 
-            } else {
-                logger.trace("{}: failure - discarding next task for {}", getFullName(), params.getRequestId());
-                return CompletableFuture.completedFuture(operation);
+                if (priority2 > priority) {
+                    outcome = outcome2;
+                    priority = priority2;
+                }
             }
+
+            logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
+                            (outcome == null ? null : outcome.getResult()), params.getRequestId());
+
+            future.complete(outcome);
         };
     }
 
     /**
-     * Converts an exception into an operation outcome, returning a copy of the outcome to
-     * prevent background jobs from changing it.
+     * Determines the priority of an outcome based on its result.
      *
-     * @param params operation parameters
-     * @param operation current operation
-     * @return a function that will convert an exception into an operation outcome
+     * @param outcome outcome to examine, or {@code null}
+     * @return the outcome's priority
      */
-    private Function<Throwable, ControlLoopOperation> fromException(ControlLoopOperationParams params,
-                    ControlLoopOperation operation) {
+    protected int detmPriority(OperationOutcome outcome) {
+        if (outcome == null) {
+            return 1;
+        }
 
-        return thrown -> {
-            logger.warn("exception throw by operation {}.{} for {}", operation.getActor(), operation.getOperation(),
-                            params.getRequestId(), thrown);
+        switch (outcome.getResult()) {
+            case SUCCESS:
+                return 0;
+
+            case FAILURE_GUARD:
+                return 2;
+
+            case FAILURE_RETRIES:
+                return 3;
+
+            case FAILURE:
+                return 4;
+
+            case FAILURE_TIMEOUT:
+                return 5;
+
+            case FAILURE_EXCEPTION:
+            default:
+                return 6;
+        }
+    }
+
+    /**
+     * Performs a task, after verifying that the controller is still running. Also checks
+     * that the previous outcome was successful, if specified.
+     *
+     * @param params operation parameters
+     * @param controller overall pipeline controller
+     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+     *        otherwise
+     * @param outcome outcome of the previous task
+     * @param tasks tasks to be performed
+     * @return a function to perform the task. If everything checks out, then it returns
+     *         the task's future. Otherwise, it returns an incomplete future and completes
+     *         the controller instead.
+     */
+    // @formatter:off
+    protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
+                    PipelineControllerFuture<OperationOutcome> controller,
+                    boolean checkSuccess, OperationOutcome outcome,
+                    CompletableFuture<OperationOutcome> task) {
+        // @formatter:on
 
+        if (checkSuccess && !isSuccess(outcome)) {
             /*
-             * Must make a copy of the operation, as the original could be changed by
-             * background jobs that might still be running.
+             * must complete before canceling so that cancel() doesn't cause controller to
+             * complete
              */
-            return setOutcome(params, new ControlLoopOperation(operation), thrown);
-        };
+            controller.complete(outcome);
+            task.cancel(false);
+            return new CompletableFuture<>();
+        }
+
+        return controller.wrap(task);
     }
 
     /**
-     * Gets a function to verify that the operation is still running. If the pipeline is
-     * not running, then it returns an incomplete future, which will effectively halt
-     * subsequent operations in the pipeline. This method is intended to be used with one
-     * of the {@link CompletableFuture}'s <i>thenCompose()</i> methods.
+     * Performs a task, after verifying that the controller is still running. Also checks
+     * that the previous outcome was successful, if specified.
      *
-     * @param controller pipeline controller
      * @param params operation parameters
-     * @return a function to verify that the operation is still running
+     * @param controller overall pipeline controller
+     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
+     *        otherwise
+     * @param tasks tasks to be performed
+     * @return a function to perform the task. If everything checks out, then it returns
+     *         the task's future. Otherwise, it returns an incomplete future and completes
+     *         the controller instead.
      */
-    protected <T> Function<T, CompletableFuture<T>> verifyRunning(
-                    PipelineControllerFuture<ControlLoopOperation> controller, ControlLoopOperationParams params) {
+    // @formatter:off
+    protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
+                    PipelineControllerFuture<OperationOutcome> controller,
+                    boolean checkSuccess,
+                    Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
+        // @formatter:on
+
+        return outcome -> {
+
+            if (!controller.isRunning()) {
+                return new CompletableFuture<>();
+            }
 
-        return value -> {
-            boolean running = controller.isRunning();
-            logger.trace("{}: verify running {} for {}", getFullName(), running, params.getRequestId());
+            if (checkSuccess && !isSuccess(outcome)) {
+                controller.complete(outcome);
+                return new CompletableFuture<>();
+            }
 
-            return (running ? CompletableFuture.completedFuture(value) : new CompletableFuture<>());
+            return controller.wrap(task.apply(outcome));
         };
     }
 
@@ -591,10 +744,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param callbacks used to determine if the start callback can be invoked
      * @return a function that sets the start time and invokes the callback
      */
-    private Function<ControlLoopOperation, ControlLoopOperation> callbackStarted(ControlLoopOperationParams params,
+    private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
                     CallbackManager callbacks) {
 
-        return outcome -> {
+        return (outcome, thrown) -> {
 
             if (callbacks.canStart()) {
                 // haven't invoked "start" callback yet
@@ -602,8 +755,6 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
                 outcome.setEnd(null);
                 params.callbackStarted(outcome);
             }
-
-            return outcome;
         };
     }
 
@@ -621,18 +772,16 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param callbacks used to determine if the end callback can be invoked
      * @return a function that sets the end time and invokes the callback
      */
-    private Function<ControlLoopOperation, ControlLoopOperation> callbackCompleted(ControlLoopOperationParams params,
+    private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
                     CallbackManager callbacks) {
 
-        return operation -> {
+        return (outcome, thrown) -> {
 
             if (callbacks.canEnd()) {
-                operation.setStart(callbacks.getStartTime());
-                operation.setEnd(callbacks.getEndTime());
-                params.callbackCompleted(operation);
+                outcome.setStart(callbacks.getStartTime());
+                outcome.setEnd(callbacks.getEndTime());
+                params.callbackCompleted(outcome);
             }
-
-            return operation;
         };
     }
 
@@ -643,7 +792,7 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param operation operation to be updated
      * @return the updated operation
      */
-    protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+    protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
                     Throwable thrown) {
         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
         return setOutcome(params, operation, result);
@@ -657,10 +806,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * @param result result of the operation
      * @return the updated operation
      */
-    protected ControlLoopOperation setOutcome(ControlLoopOperationParams params, ControlLoopOperation operation,
+    protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
                     PolicyResult result) {
         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
-        operation.setOutcome(result.toString());
+        operation.setResult(result);
         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
                         : ControlLoopOperation.FAILED_MSG);
 
@@ -687,71 +836,10 @@ public abstract class OperatorPartial extends StartConfigPartial<Map<String, Obj
      * Gets the operation timeout. Subclasses may override this method to obtain the
      * timeout in some other way (e.g., through configuration properties).
      *
-     * @param policy policy from which to extract the timeout
+     * @param timeoutSec timeout, in seconds, or {@code null}
      * @return the operation timeout, in milliseconds
      */
-    protected long getTimeOutMillis(Policy policy) {
-        Integer timeoutSec = policy.getTimeout();
+    protected long getTimeOutMillis(Integer timeoutSec) {
         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
     }
-
-    /**
-     * Manager for "start" and "end" callbacks.
-     */
-    private static class CallbackManager implements Runnable {
-        private final AtomicReference<Instant> startTime = new AtomicReference<>();
-        private final AtomicReference<Instant> endTime = new AtomicReference<>();
-
-        /**
-         * Determines if the "start" callback can be invoked. If so, it sets the
-         * {@link #startTime} to the current time.
-         *
-         * @return {@code true} if the "start" callback can be invoked, {@code false}
-         *         otherwise
-         */
-        public boolean canStart() {
-            return startTime.compareAndSet(null, Instant.now());
-        }
-
-        /**
-         * Determines if the "end" callback can be invoked. If so, it sets the
-         * {@link #endTime} to the current time.
-         *
-         * @return {@code true} if the "end" callback can be invoked, {@code false}
-         *         otherwise
-         */
-        public boolean canEnd() {
-            return endTime.compareAndSet(null, Instant.now());
-        }
-
-        /**
-         * Gets the start time.
-         *
-         * @return the start time, or {@code null} if {@link #canStart()} has not been
-         *         invoked yet.
-         */
-        public Instant getStartTime() {
-            return startTime.get();
-        }
-
-        /**
-         * Gets the end time.
-         *
-         * @return the end time, or {@code null} if {@link #canEnd()} has not been invoked
-         *         yet.
-         */
-        public Instant getEndTime() {
-            return endTime.get();
-        }
-
-        /**
-         * Prevents further callbacks from being executed by setting {@link #startTime}
-         * and {@link #endTime}.
-         */
-        @Override
-        public void run() {
-            canStart();
-            canEnd();
-        }
-    }
 }