Make targetEntity a property
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartial.java
index d00b88b..b5cc15e 100644 (file)
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -28,11 +38,24 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.policy.PolicyResult;
 import org.slf4j.Logger;
@@ -51,55 +74,91 @@ import org.slf4j.LoggerFactory;
  * returned by overridden methods will do the same. Of course, if a class overrides
  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
  * be done to cancel that particular operation.
+ * <p/>
+ * In general tasks in a pipeline are executed by the same thread. However, the following
+ * should always be executed via the executor specified in "params":
+ * <ul>
+ * <li>start callback</li>
+ * <li>completion callback</li>
+ * <li>controller completion (i.e., delayedComplete())</li>
+ * </ul>
  */
 public abstract class OperationPartial implements Operation {
-
     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
-    public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+    private static final Coder coder = new StandardCoder();
 
-    // values extracted from the operator
+    public static final String GUARD_ACTOR_NAME = "GUARD";
+    public static final String GUARD_OPERATION_NAME = "Decision";
+    public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
 
-    private final OperatorPartial operator;
+    private final OperatorConfig config;
 
     /**
      * Operation parameters.
      */
     protected final ControlLoopOperationParams params;
 
+    @Getter
+    private final String fullName;
+
+    @Getter
+    @Setter(AccessLevel.PROTECTED)
+    private String subRequestId;
+
+    @Getter
+    private final List<String> propertyNames;
+
+    /**
+     * Values for the properties identified by {@link #getPropertyNames()}.
+     */
+    private final Map<String, Object> properties = new HashMap<>();
+
 
     /**
      * Constructs the object.
      *
      * @param params operation parameters
-     * @param operator operator that created this operation
+     * @param config configuration for this operation
+     * @param propertyNames names of properties required by this operation
      */
-    public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
+    public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
         this.params = params;
-        this.operator = operator;
+        this.config = config;
+        this.fullName = params.getActor() + "." + params.getOperation();
+        this.propertyNames = propertyNames;
     }
 
     public Executor getBlockingExecutor() {
-        return operator.getBlockingExecutor();
-    }
-
-    public String getFullName() {
-        return operator.getFullName();
+        return config.getBlockingExecutor();
     }
 
     public String getActorName() {
-        return operator.getActorName();
+        return params.getActor();
     }
 
     public String getName() {
-        return operator.getName();
+        return params.getOperation();
     }
 
     @Override
-    public final CompletableFuture<OperationOutcome> start() {
-        if (!operator.isAlive()) {
-            throw new IllegalStateException("operation is not running: " + getFullName());
-        }
+    public boolean containsProperty(String name) {
+        return properties.containsKey(name);
+    }
+
+    @Override
+    public void setProperty(String name, Object value) {
+        logger.info("{}: set property {}={}", getFullName(), name, value);
+        properties.put(name, value);
+    }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getProperty(String name) {
+        return (T) properties.get(name);
+    }
+
+    @Override
+    public CompletableFuture<OperationOutcome> start() {
         // allocate a controller for the entire operation
         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
@@ -144,7 +203,7 @@ public abstract class OperationPartial implements Operation {
 
         return outcome -> {
 
-            if (outcome != null && isSuccess(outcome)) {
+            if (isSuccess(outcome)) {
                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
                 return CompletableFuture.completedFuture(outcome);
             }
@@ -157,10 +216,11 @@ public abstract class OperationPartial implements Operation {
             // propagate "stop" to the callbacks
             controller.add(callbacks);
 
-            final OperationOutcome outcome2 = params.makeOutcome();
+            final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
 
             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
 
+            outcome2.setFinalOutcome(true);
             outcome2.setResult(PolicyResult.FAILURE_GUARD);
             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
 
@@ -177,7 +237,7 @@ public abstract class OperationPartial implements Operation {
 
     /**
      * Invokes the operation's preprocessor step(s) as a "future". This method simply
-     * invokes {@link #startGuardAsync()}.
+     * returns {@code null}.
      * <p/>
      * This method assumes the following:
      * <ul>
@@ -189,12 +249,11 @@ public abstract class OperationPartial implements Operation {
      *         {@code null} if this operation needs no preprocessor
      */
     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
-        return startGuardAsync();
+        return null;
     }
 
     /**
-     * Invokes the operation's guard step(s) as a "future". This method simply returns
-     * {@code null}.
+     * Invokes the operation's guard step(s) as a "future".
      * <p/>
      * This method assumes the following:
      * <ul>
@@ -206,7 +265,40 @@ public abstract class OperationPartial implements Operation {
      *         {@code null} if this operation has no guard
      */
     protected CompletableFuture<OperationOutcome> startGuardAsync() {
-        return null;
+        if (params.isPreprocessed()) {
+            return null;
+        }
+
+        // get the guard payload
+        Map<String, Object> payload = makeGuardPayload();
+
+        /*
+         * Note: can't use constants from actor.guard, because that would create a
+         * circular dependency.
+         */
+        return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
+                        .payload(payload).build().start();
+    }
+
+    /**
+     * Creates a payload to execute a guard operation.
+     *
+     * @return a new guard payload
+     */
+    protected Map<String, Object> makeGuardPayload() {
+        // TODO delete this once preprocessing is done by the application
+        Map<String, Object> guard = new LinkedHashMap<>();
+        guard.put("actor", params.getActor());
+        guard.put("operation", params.getOperation());
+        guard.put("target", getTargetEntity());
+        guard.put("requestId", params.getRequestId());
+
+        String clname = params.getContext().getEvent().getClosedLoopControlName();
+        if (clname != null) {
+            guard.put("clname", clname);
+        }
+
+        return guard;
     }
 
     /**
@@ -220,6 +312,8 @@ public abstract class OperationPartial implements Operation {
     private CompletableFuture<OperationOutcome> startOperationAttempt(
                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
+        generateSubRequestId(attempt);
+
         // propagate "stop" to the operation attempt
         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
@@ -227,6 +321,17 @@ public abstract class OperationPartial implements Operation {
         return controller;
     }
 
+    /**
+     * Generates and sets {@link #subRequestId} to a new subrequest ID.
+     *
+     * @param attempt attempt number, typically starting with 1
+     */
+    public void generateSubRequestId(int attempt) {
+        // Note: this should be "protected", but that makes junits much messier
+
+        setSubRequestId(UUID.randomUUID().toString());
+    }
+
     /**
      * Starts the operation attempt, without doing any retries.
      *
@@ -239,7 +344,7 @@ public abstract class OperationPartial implements Operation {
         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
         final Executor executor = params.getExecutor();
-        final OperationOutcome outcome = params.makeOutcome();
+        final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
         final CallbackManager callbacks = new CallbackManager();
 
         // this operation attempt gets its own controller
@@ -287,7 +392,7 @@ public abstract class OperationPartial implements Operation {
      * @return {@code true} if the outcome was successful
      */
     protected boolean isSuccess(OperationOutcome outcome) {
-        return (outcome.getResult() == PolicyResult.SUCCESS);
+        return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
     }
 
     /**
@@ -363,35 +468,47 @@ public abstract class OperationPartial implements Operation {
      */
     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
 
-        return operation -> {
-            if (operation != null && !isActorFailed(operation)) {
-                /*
-                 * wrong type or wrong operation - just leave it as is. No need to log
-                 * anything here, as retryOnFailure() will log a message
-                 */
-                return operation;
+        return origOutcome -> {
+            // ensure we have a non-null outcome
+            OperationOutcome outcome;
+            if (origOutcome != null) {
+                outcome = origOutcome;
+            } else {
+                logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
+                outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), PolicyResult.FAILURE);
             }
 
-            // get a non-null operation
-            OperationOutcome oper2;
-            if (operation != null) {
-                oper2 = operation;
-            } else {
-                oper2 = params.makeOutcome();
-                oper2.setResult(PolicyResult.FAILURE);
+            // ensure correct actor/operation
+            outcome.setActor(getActorName());
+            outcome.setOperation(getName());
+
+            // determine if we should retry, based on the result
+            if (outcome.getResult() != PolicyResult.FAILURE) {
+                // do not retry success or other failure types (e.g., exception)
+                outcome.setFinalOutcome(true);
+                return outcome;
             }
 
             int retry = getRetry(params.getRetry());
-            if (retry > 0 && attempt > retry) {
+            if (retry <= 0) {
+                // no retries were specified
+                outcome.setFinalOutcome(true);
+
+            } else if (attempt <= retry) {
+                // have more retries - not the final outcome
+                outcome.setFinalOutcome(false);
+
+            } else {
                 /*
                  * retries were specified and we've already tried them all - change to
                  * FAILURE_RETRIES
                  */
                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
-                oper2.setResult(PolicyResult.FAILURE_RETRIES);
+                outcome.setResult(PolicyResult.FAILURE_RETRIES);
+                outcome.setFinalOutcome(true);
             }
 
-            return oper2;
+            return outcome;
         };
     }
 
@@ -457,10 +574,16 @@ public abstract class OperationPartial implements Operation {
     private Function<Throwable, OperationOutcome> fromException(String type) {
 
         return thrown -> {
-            OperationOutcome outcome = params.makeOutcome();
+            OperationOutcome outcome = params.makeOutcome(getTargetEntity());
 
-            logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
-                            params.getRequestId(), thrown);
+            if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+                // do not include exception in the message, as it just clutters the log
+                logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+                                params.getRequestId());
+            } else {
+                logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+                                params.getRequestId(), thrown);
+            }
 
             return setOutcome(outcome, thrown);
         };
@@ -470,103 +593,108 @@ public abstract class OperationPartial implements Operation {
      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
      * any outstanding futures when one completes.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled
      */
-    protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
-
-        // convert list to an array
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+    public CompletableFuture<OperationOutcome> anyOf(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        @SuppressWarnings("unchecked")
-        CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
-        return result;
+        return anyOf(Arrays.asList(futureMakers));
     }
 
     /**
-     * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
-     * outstanding futures when one completes.
+     * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+     * any outstanding futures when one completes.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled. Similarly, when this future completes, any incomplete futures
+     *         will be canceled
      */
-    protected CompletableFuture<OperationOutcome> anyOf(
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+    public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+        CompletableFuture<OperationOutcome>[] futures =
+                        attachFutures(controller, futureMakers, UnaryOperator.identity());
+
+        if (futures.length == 0) {
+            // no futures were started
+            return null;
+        }
 
         if (futures.length == 1) {
             return futures[0];
         }
 
-        final Executor executor = params.getExecutor();
-        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        attachFutures(controller, futures);
-
-        // @formatter:off
-        CompletableFuture.anyOf(futures)
-                            .thenApply(object -> (OperationOutcome) object)
-                            .whenCompleteAsync(controller.delayedComplete(), executor);
-        // @formatter:on
+        CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+                        .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
         return controller;
     }
 
     /**
-     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
-     * the futures if returned future is canceled. The future returns the "worst" outcome,
-     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled
      */
-    protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
+    public CompletableFuture<OperationOutcome> allOf(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        // convert list to an array
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
-
-        @SuppressWarnings("unchecked")
-        CompletableFuture<OperationOutcome> result = allOf(arrFutures);
-        return result;
+        return allOf(Arrays.asList(futureMakers));
     }
 
     /**
-     * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
-     * futures if returned future is canceled. The future returns the "worst" outcome,
-     * based on priority (see {@link #detmPriority(OperationOutcome)}).
+     * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
      *
-     * @param futures futures for which to wait
-     * @return a future to cancel or await an outcome. If this future is canceled, then
-     *         all of the futures will be canceled
+     * @param futureMakers function to make a future. If the function returns
+     *        {@code null}, then no future is created for that function. On the other
+     *        hand, if the function throws an exception, then the previously created
+     *        functions are canceled and the exception is re-thrown
+     * @return a future to cancel or await an outcome, or {@code null} if no futures were
+     *         created. If this future is canceled, then all of the futures will be
+     *         canceled. Similarly, when this future completes, any incomplete futures
+     *         will be canceled
      */
-    protected CompletableFuture<OperationOutcome> allOf(
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+    public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
-        if (futures.length == 1) {
-            return futures[0];
-        }
-
-        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+        Queue<OperationOutcome> outcomes = new LinkedList<>();
 
-        attachFutures(controller, futures);
+        CompletableFuture<OperationOutcome>[] futures =
+                        attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
+                            synchronized (outcomes) {
+                                outcomes.add(outcome);
+                            }
+                            return outcome;
+                        }));
 
-        OperationOutcome[] outcomes = new OperationOutcome[futures.length];
-
-        @SuppressWarnings("rawtypes")
-        CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+        if (futures.length == 0) {
+            // no futures were started
+            return null;
+        }
 
-        // record the outcomes of each future when it completes
-        for (int count = 0; count < futures2.length; ++count) {
-            final int count2 = count;
-            futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+        if (futures.length == 1) {
+            return futures[0];
         }
 
         // @formatter:off
-        CompletableFuture.allOf(futures2)
+        CompletableFuture.allOf(futures)
                         .thenApply(unused -> combineOutcomes(outcomes))
                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
         // @formatter:on
@@ -575,22 +703,62 @@ public abstract class OperationPartial implements Operation {
     }
 
     /**
-     * Attaches the given futures to the controller.
+     * Invokes the functions to create the futures and attaches them to the controller.
      *
      * @param controller master controller for all of the futures
-     * @param futures futures to be attached to the controller
+     * @param futureMakers futures to be attached to the controller
+     * @param adorn function that "adorns" the future, possible adding onto its pipeline.
+     *        Returns the adorned future
+     * @return an array of futures, possibly zero-length. If the array is of size one,
+     *         then that one item should be returned instead of the controller
      */
-    private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
-                    @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+    private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
+                    UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
+
+        if (futureMakers.isEmpty()) {
+            @SuppressWarnings("unchecked")
+            CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
+            return result;
+        }
 
-        if (futures.length == 0) {
-            throw new IllegalArgumentException("empty list of futures");
+        // the last, unadorned future that is created
+        CompletableFuture<OperationOutcome> lastFuture = null;
+
+        List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
+
+        // make each future
+        for (var maker : futureMakers) {
+            try {
+                CompletableFuture<OperationOutcome> future = maker.get();
+                if (future == null) {
+                    continue;
+                }
+
+                // propagate "stop" to the future
+                controller.add(future);
+
+                futures.add(adorn.apply(future));
+
+                lastFuture = future;
+
+            } catch (RuntimeException e) {
+                logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
+                controller.cancel(false);
+                throw e;
+            }
         }
 
-        // attach each task
-        for (CompletableFuture<OperationOutcome> future : futures) {
-            controller.add(future);
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+
+        if (result.length == 1) {
+            // special case - return the unadorned future
+            result[0] = lastFuture;
+            return result;
         }
+
+        return futures.toArray(result);
     }
 
     /**
@@ -599,15 +767,13 @@ public abstract class OperationPartial implements Operation {
      * @param outcomes outcomes to be examined
      * @return the combined outcome
      */
-    private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+    private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
 
         // identify the outcome with the highest priority
-        OperationOutcome outcome = outcomes[0];
+        OperationOutcome outcome = outcomes.remove();
         int priority = detmPriority(outcome);
 
-        // start with "1", as we've already dealt with "0"
-        for (int count = 1; count < outcomes.length; ++count) {
-            OperationOutcome outcome2 = outcomes[count];
+        for (OperationOutcome outcome2 : outcomes) {
             int priority2 = detmPriority(outcome2);
 
             if (priority2 > priority) {
@@ -656,71 +822,113 @@ public abstract class OperationPartial implements Operation {
     }
 
     /**
-     * Performs a task, after verifying that the controller is still running. Also checks
-     * that the previous outcome was successful, if specified.
+     * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+     * not created until the previous task completes. The pipeline returns the outcome of
+     * the last task executed.
      *
-     * @param controller overall pipeline controller
-     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
-     *        otherwise
-     * @param outcome outcome of the previous task
-     * @param task task to be performed
-     * @return the task, if everything checks out. Otherwise, it returns an incomplete
-     *         future and completes the controller instead
+     * @param futureMakers functions to make the futures
+     * @return a future to cancel the sequence or await the outcome
      */
-    // @formatter:off
-    protected CompletableFuture<OperationOutcome> doTask(
-                    PipelineControllerFuture<OperationOutcome> controller,
-                    boolean checkSuccess, OperationOutcome outcome,
-                    CompletableFuture<OperationOutcome> task) {
-        // @formatter:on
+    public CompletableFuture<OperationOutcome> sequence(
+                    @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
-        if (checkSuccess && !isSuccess(outcome)) {
-            /*
-             * must complete before canceling so that cancel() doesn't cause controller to
-             * complete
-             */
-            controller.complete(outcome);
-            task.cancel(false);
-            return new CompletableFuture<>();
+        return sequence(Arrays.asList(futureMakers));
+    }
+
+    /**
+     * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+     * not created until the previous task completes. The pipeline returns the outcome of
+     * the last task executed.
+     *
+     * @param futureMakers functions to make the futures
+     * @return a future to cancel the sequence or await the outcome, or {@code null} if
+     *         there were no tasks to perform
+     */
+    public CompletableFuture<OperationOutcome> sequence(
+                    List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+        Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
+
+        CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
+        if (nextTask == null) {
+            // no tasks
+            return null;
+        }
+
+        if (queue.isEmpty()) {
+            // only one task - just return it rather than wrapping it in a controller
+            return nextTask;
         }
 
-        return controller.wrap(task);
+        /*
+         * multiple tasks - need a controller to stop whichever task is currently
+         * executing
+         */
+        final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+        final Executor executor = params.getExecutor();
+
+        // @formatter:off
+        controller.wrap(nextTask)
+                    .thenCompose(nextTaskOnSuccess(controller, queue))
+                    .whenCompleteAsync(controller.delayedComplete(), executor);
+        // @formatter:on
+
+        return controller;
     }
 
     /**
-     * Performs a task, after verifying that the controller is still running. Also checks
-     * that the previous outcome was successful, if specified.
+     * Executes the next task in the queue, if the previous outcome was successful.
      *
-     * @param controller overall pipeline controller
-     * @param checkSuccess {@code true} to check the previous outcome, {@code false}
-     *        otherwise
-     * @param task function to start the task to be performed
-     * @return a function to perform the task. If everything checks out, then it returns
-     *         the task. Otherwise, it returns an incomplete future and completes the
-     *         controller instead
+     * @param controller pipeline controller
+     * @param taskQueue queue of tasks to be performed
+     * @return a future to execute the remaining tasks, or the current outcome, if it's a
+     *         failure, or if there are no more tasks
      */
-    // @formatter:off
-    protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
+    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
                     PipelineControllerFuture<OperationOutcome> controller,
-                    boolean checkSuccess,
-                    Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
-        // @formatter:on
+                    Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
 
         return outcome -> {
-
-            if (!controller.isRunning()) {
-                return new CompletableFuture<>();
+            if (!isSuccess(outcome)) {
+                // return the failure
+                return CompletableFuture.completedFuture(outcome);
             }
 
-            if (checkSuccess && !isSuccess(outcome)) {
-                controller.complete(outcome);
-                return new CompletableFuture<>();
+            CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
+            if (nextTask == null) {
+                // no tasks - just return the success
+                return CompletableFuture.completedFuture(outcome);
             }
 
-            return controller.wrap(task.apply(outcome));
+            // @formatter:off
+            return controller
+                        .wrap(nextTask)
+                        .thenCompose(nextTaskOnSuccess(controller, taskQueue));
+            // @formatter:on
         };
     }
 
+    /**
+     * Gets the next task from the queue, skipping those that are {@code null}.
+     *
+     * @param taskQueue task queue
+     * @return the next task, or {@code null} if the queue is now empty
+     */
+    private CompletableFuture<OperationOutcome> getNextTask(
+                    Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
+
+        Supplier<CompletableFuture<OperationOutcome>> maker;
+
+        while ((maker = taskQueue.poll()) != null) {
+            CompletableFuture<OperationOutcome> future = maker.get();
+            if (future != null) {
+                return future;
+            }
+        }
+
+        return null;
+    }
+
     /**
      * Sets the start time of the operation and invokes the callback to indicate that the
      * operation has started. Does nothing if the pipeline has been stopped.
@@ -730,15 +938,19 @@ public abstract class OperationPartial implements Operation {
      * @param callbacks used to determine if the start callback can be invoked
      * @return a function that sets the start time and invokes the callback
      */
-    private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
+    protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
 
         return (outcome, thrown) -> {
 
             if (callbacks.canStart()) {
-                // haven't invoked "start" callback yet
+                outcome.setSubRequestId(getSubRequestId());
                 outcome.setStart(callbacks.getStartTime());
                 outcome.setEnd(null);
-                params.callbackStarted(outcome);
+
+                // pass a copy to the callback
+                OperationOutcome outcome2 = new OperationOutcome(outcome);
+                outcome2.setFinalOutcome(false);
+                params.callbackStarted(outcome2);
             }
         };
     }
@@ -756,14 +968,16 @@ public abstract class OperationPartial implements Operation {
      * @param callbacks used to determine if the end callback can be invoked
      * @return a function that sets the end time and invokes the callback
      */
-    private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
+    protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
 
         return (outcome, thrown) -> {
-
             if (callbacks.canEnd()) {
+                outcome.setSubRequestId(getSubRequestId());
                 outcome.setStart(callbacks.getStartTime());
                 outcome.setEnd(callbacks.getEndTime());
-                params.callbackCompleted(outcome);
+
+                // pass a copy to the callback
+                params.callbackCompleted(new OperationOutcome(outcome));
             }
         };
     }
@@ -774,7 +988,7 @@ public abstract class OperationPartial implements Operation {
      * @param operation operation to be updated
      * @return the updated operation
      */
-    protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
+    public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
         return setOutcome(operation, result);
     }
@@ -809,6 +1023,54 @@ public abstract class OperationPartial implements Operation {
         return (thrown instanceof TimeoutException);
     }
 
+    /**
+     * Logs a message. If the message is not of type, String, then it attempts to
+     * pretty-print it into JSON before logging.
+     *
+     * @param direction IN or OUT
+     * @param infra communication infrastructure on which it was published
+     * @param source source name (e.g., the URL or Topic name)
+     * @param message message to be logged
+     * @return the JSON text that was logged
+     */
+    public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
+        String json;
+        try {
+            json = prettyPrint(message);
+
+        } catch (IllegalArgumentException e) {
+            String type = (direction == EventType.IN ? "response" : "request");
+            logger.warn("cannot pretty-print {}", type, e);
+            json = message.toString();
+        }
+
+        logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
+
+        return json;
+    }
+
+    /**
+     * Converts a message to a "pretty-printed" String using the operation's normal
+     * serialization provider (i.e., it's <i>coder</i>).
+     *
+     * @param message response to be logged
+     * @return the JSON text that was logged
+     * @throws IllegalArgumentException if the message cannot be converted
+     */
+    public <T> String prettyPrint(T message) {
+        if (message == null) {
+            return null;
+        } else if (message instanceof String) {
+            return message.toString();
+        } else {
+            try {
+                return getCoder().encode(message, true);
+            } catch (CoderException e) {
+                throw new IllegalArgumentException("cannot encode message", e);
+            }
+        }
+    }
+
     // these may be overridden by subclasses or junit tests
 
     /**
@@ -830,6 +1092,16 @@ public abstract class OperationPartial implements Operation {
         return DEFAULT_RETRY_WAIT_MS;
     }
 
+    /**
+     * Gets the target entity, first trying the properties and then the parameters.
+     *
+     * @return the target entity
+     */
+    protected String getTargetEntity() {
+        String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
+        return (targetEntity != null ? targetEntity : params.getTargetEntity());
+    }
+
     /**
      * Gets the operation timeout.
      *
@@ -841,4 +1113,10 @@ public abstract class OperationPartial implements Operation {
     protected long getTimeoutMs(Integer timeoutSec) {
         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
     }
+
+    // these may be overridden by junit tests
+
+    protected Coder getCoder() {
+        return coder;
+    }
 }