Make targetEntity a property
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
index f28c1f6..6db824f 100644 (file)
@@ -20,8 +20,7 @@
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -29,9 +28,16 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import ch.qos.logback.classic.Logger;
 import java.time.Instant;
+import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -40,47 +46,82 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.LoggerFactory;
 
 public class OperationPartialTest {
-    private static final int MAX_PARALLEL_REQUESTS = 10;
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+    private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
+    private static final int MAX_REQUESTS = 100;
+    private static final int MAX_PARALLEL = 10;
     private static final String EXPECTED_EXCEPTION = "expected exception";
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
-    private static final String TARGET = "my-target";
+    private static final String MY_SINK = "my-sink";
+    private static final String MY_SOURCE = "my-source";
+    private static final String MY_TARGET_ENTITY = "my-entity";
+    private static final String TEXT = "my-text";
     private static final int TIMEOUT = 1000;
     private static final UUID REQ_ID = UUID.randomUUID();
 
     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
+    /**
+     * Used to attach an appender to the class' logger.
+     */
+    private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
+    private static final ExtractAppender appender = new ExtractAppender();
+
+    private static final List<String> PROP_NAMES = List.of("hello", "world");
+
+    @Mock
+    private ActorService service;
+    @Mock
+    private Actor guardActor;
+    @Mock
+    private Operator guardOperator;
+    @Mock
+    private Operation guardOperation;
+
     private VirtualControlLoopEvent event;
     private ControlLoopEventContext context;
-    private MyExec executor;
+    private PseudoExecutor executor;
     private ControlLoopOperationParams params;
 
     private MyOper oper;
@@ -93,37 +134,56 @@ public class OperationPartialTest {
     private OperationOutcome opstart;
     private OperationOutcome opend;
 
-    private OperatorPartial operator;
+    private Deque<OperationOutcome> starts;
+    private Deque<OperationOutcome> ends;
+
+    private OperatorConfig config;
+
+    /**
+     * Attaches the appender to the logger.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        /*
+         * Attach appender to the logger.
+         */
+        appender.setContext(logger.getLoggerContext());
+        appender.start();
+
+        logger.addAppender(appender);
+    }
+
+    /**
+     * Stops the appender.
+     */
+    @AfterClass
+    public static void tearDownAfterClass() {
+        appender.stop();
+    }
 
     /**
      * Initializes the fields, including {@link #oper}.
      */
     @Before
     public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
         event = new VirtualControlLoopEvent();
         event.setRequestId(REQ_ID);
 
         context = new ControlLoopEventContext(event);
-        executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
+        executor = new PseudoExecutor();
 
         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
-                        .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
-                        .startCallback(this::starter).targetEntity(TARGET).build();
+                        .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
+                        .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
 
-        operator = new OperatorPartial(ACTOR, OPERATION) {
-            @Override
-            public Executor getBlockingExecutor() {
-                return executor;
-            }
+        when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
+        when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
+        when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
+        when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
 
-            @Override
-            public Operation buildOperation(ControlLoopOperationParams params) {
-                return null;
-            }
-        };
-
-        operator.configure(null);
-        operator.start();
+        config = new OperatorConfig(executor);
 
         oper = new MyOper();
 
@@ -131,6 +191,9 @@ public class OperationPartialTest {
 
         opstart = null;
         opend = null;
+
+        starts = new ArrayDeque<>(10);
+        ends = new ArrayDeque<>(10);
     }
 
     @Test
@@ -157,51 +220,25 @@ public class OperationPartialTest {
         assertNull(future.get(5, TimeUnit.SECONDS));
     }
 
-    /**
-     * Exercises the doXxx() methods.
-     */
     @Test
-    public void testDoXxx() {
-        assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
-        assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
-        assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
-        assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
-
+    public void testGetPropertyNames() {
+        assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
     }
 
     @Test
-    public void testStart() {
-        verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
+    public void testGetProperty_testSetProperty() {
+        oper.setProperty("propertyA", "valueA");
+        oper.setProperty("propertyB", "valueB");
+        oper.setProperty("propertyC", 20);
+
+        assertEquals("valueA", oper.getProperty("propertyA"));
+        assertEquals("valueB", oper.getProperty("propertyB"));
+        assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
     }
 
-    /**
-     * Tests startOperation() when the operator is not running.
-     */
     @Test
-    public void testStartNotRunning() {
-        // stop the operator
-        operator.stop();
-
-        assertThatIllegalStateException().isThrownBy(() -> oper.start());
-    }
-
-    /**
-     * Tests startOperation() when the operation has a preprocessor.
-     */
-    @Test
-    public void testStartWithPreprocessor() {
-        AtomicInteger count = new AtomicInteger();
-
-        CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
-            count.incrementAndGet();
-            return makeSuccess();
-        }, executor);
-
-        oper.setGuard(preproc);
-
-        verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
-
-        assertEquals(1, count.get());
+    public void testStart() {
+        verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
     }
 
     /**
@@ -209,19 +246,19 @@ public class OperationPartialTest {
      */
     @Test
     public void testStartMultiple() {
-        for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+        for (int count = 0; count < MAX_PARALLEL; ++count) {
             oper.start();
         }
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
         assertNotNull(opstart);
         assertNotNull(opend);
         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
-        assertEquals(MAX_PARALLEL_REQUESTS, numStart);
-        assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
-        assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
+        assertEquals(MAX_PARALLEL, numStart);
+        assertEquals(MAX_PARALLEL, oper.getCount());
+        assertEquals(MAX_PARALLEL, numEnd);
     }
 
     /**
@@ -229,7 +266,7 @@ public class OperationPartialTest {
      */
     @Test
     public void testStartPreprocessorFailure() {
-        oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
+        oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
     }
@@ -240,7 +277,7 @@ public class OperationPartialTest {
     @Test
     public void testStartPreprocessorException() {
         // arrange for the preprocessor to throw an exception
-        oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
+        oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
 
         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
     }
@@ -251,10 +288,10 @@ public class OperationPartialTest {
     @Test
     public void testStartPreprocessorNotRunning() {
         // arrange for the preprocessor to return success, which will be ignored
-        oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
+        // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
         oper.start().cancel(false);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNull(opstart);
         assertNull(opend);
@@ -288,20 +325,66 @@ public class OperationPartialTest {
     }
 
     @Test
-    public void testStartGuardAsync() {
-        assertNull(oper.startGuardAsync());
+    public void testStartGuardAsync() throws Exception {
+        CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
+        assertTrue(future.isDone());
+        assertEquals(PolicyResult.SUCCESS, future.get().getResult());
+
+        // verify the parameters that were passed
+        ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
+                        ArgumentCaptor.forClass(ControlLoopOperationParams.class);
+        verify(guardOperator).buildOperation(paramsCaptor.capture());
+
+        params = paramsCaptor.getValue();
+        assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
+        assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
+        assertNull(params.getRetry());
+        assertNull(params.getTimeoutSec());
+
+        Map<String, Object> payload = params.getPayload();
+        assertNotNull(payload);
+
+        assertEquals(oper.makeGuardPayload(), payload);
+    }
+
+    /**
+     * Tests startGuardAsync() when preprocessing is disabled.
+     */
+    @Test
+    public void testStartGuardAsyncDisabled() {
+        params = params.toBuilder().preprocessed(true).build();
+        assertNull(new MyOper().startGuardAsync());
+    }
+
+    @Test
+    public void testMakeGuardPayload() {
+        Map<String, Object> payload = oper.makeGuardPayload();
+        assertSame(REQ_ID, payload.get("requestId"));
+
+        // request id changes, so remove it
+        payload.remove("requestId");
+
+        assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
+
+        // repeat, but with closed loop name
+        event.setClosedLoopControlName("my-loop");
+        payload = oper.makeGuardPayload();
+        payload.remove("requestId");
+        assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
     }
 
     @Test
     public void testStartOperationAsync() {
         oper.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertEquals(1, oper.getCount());
     }
 
     @Test
     public void testIsSuccess() {
+        assertFalse(oper.isSuccess(null));
+
         OperationOutcome outcome = new OperationOutcome();
 
         outcome.setResult(PolicyResult.SUCCESS);
@@ -317,7 +400,7 @@ public class OperationPartialTest {
     public void testIsActorFailed() {
         assertFalse(oper.isActorFailed(null));
 
-        OperationOutcome outcome = params.makeOutcome();
+        OperationOutcome outcome = params.makeOutcome(null);
 
         // incorrect outcome
         outcome.setResult(PolicyResult.SUCCESS);
@@ -330,14 +413,14 @@ public class OperationPartialTest {
         outcome.setResult(PolicyResult.FAILURE);
 
         // incorrect actor
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(null);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(ACTOR);
 
         // incorrect operation
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setOperation(null);
         assertFalse(oper.isActorFailed(outcome));
@@ -352,10 +435,10 @@ public class OperationPartialTest {
         /*
          * Use an operation that doesn't override doOperation().
          */
-        OperationPartial oper2 = new OperationPartial(params, operator) {};
+        OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
         oper2.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNotNull(opend);
         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
@@ -377,7 +460,7 @@ public class OperationPartialTest {
             @Override
             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
-                OperationOutcome outcome2 = params.makeOutcome();
+                OperationOutcome outcome2 = params.makeOutcome(null);
                 outcome2.setResult(PolicyResult.SUCCESS);
 
                 /*
@@ -469,15 +552,15 @@ public class OperationPartialTest {
         // arrange to return null from doOperation()
         oper = new MyOper() {
             @Override
-            protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
+            protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
 
                 // update counters
-                super.doOperation(attempt, operation);
+                super.doOperation(attempt, outcome);
                 return null;
             }
         };
 
-        verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
+        verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
     }
 
     @Test
@@ -514,19 +597,19 @@ public class OperationPartialTest {
     public void testIsSameOperation() {
         assertFalse(oper.isSameOperation(null));
 
-        OperationOutcome outcome = params.makeOutcome();
+        OperationOutcome outcome = params.makeOutcome(null);
 
         // wrong actor - should be false
         outcome.setActor(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setActor(ACTOR);
 
         // wrong operation - should be null
         outcome.setOperation(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setOperation(OPERATION);
 
@@ -537,8 +620,8 @@ public class OperationPartialTest {
      * Tests handleFailure() when the outcome is a success.
      */
     @Test
-    public void testHandlePreprocessorFailureTrue() {
-        oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
+    public void testHandlePreprocessorFailureSuccess() {
+        oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
     }
 
@@ -546,8 +629,8 @@ public class OperationPartialTest {
      * Tests handleFailure() when the outcome is <i>not</i> a success.
      */
     @Test
-    public void testHandlePreprocessorFailureFalse() throws Exception {
-        oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
+    public void testHandlePreprocessorFailureFailed() throws Exception {
+        oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
     }
 
@@ -556,9 +639,8 @@ public class OperationPartialTest {
      */
     @Test
     public void testHandlePreprocessorFailureNull() throws Exception {
-        // arrange to return null from the preprocessor
-        oper.setGuard(CompletableFuture.completedFuture(null));
-
+        // arrange to return a null outcome from the preprocessor
+        oper.setPreProc(CompletableFuture.completedFuture(null));
         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
     }
 
@@ -584,43 +666,47 @@ public class OperationPartialTest {
     @Test
     public void testAnyOf() throws Exception {
         // first task completes, others do not
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
-        final OperationOutcome outcome = params.makeOutcome();
+        final OperationOutcome outcome = params.makeOutcome(null);
 
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> null);
+        tasks.add(() -> new CompletableFuture<>());
 
         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
 
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.anyOf(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // second task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // third task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -631,54 +717,82 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAnyOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
+        assertNull(oper.anyOf(tasks));
+        assertNull(oper.anyOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.anyOf(tasks));
-        assertSame(future1, oper.anyOf(future1));
+        assertSame(future1, oper.anyOf(() -> future1));
     }
 
-    /**
-     * Tests both flavors of allOf(), because one invokes the other.
-     */
     @Test
-    public void testAllOf() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    public void testAllOfArray() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome(null);
+
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
-        final OperationOutcome outcome = params.makeOutcome();
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome> result =
+                        oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future1.complete(outcome);
+
+        // complete 3 before 2
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future3.complete(outcome);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future2.complete(outcome);
+
+        // all of them are now done
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+    }
+
+    @Test
+    public void testAllOfList() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome(null);
 
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
-        tasks.add(future1);
-        tasks.add(future2);
-        tasks.add(future3);
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> null);
+        tasks.add(() -> future3);
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future1.complete(outcome);
 
         // complete 3 before 2
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future3.complete(outcome);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future2.complete(outcome);
 
         // all of them are now done
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -689,18 +803,41 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAllOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
+        assertNull(oper.allOf(tasks));
+        assertNull(oper.allOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.allOf(tasks));
-        assertSame(future1, oper.allOf(future1));
+        assertSame(future1, oper.allOf(() -> future1));
+    }
+
+    @Test
+    public void testAttachFutures() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // third task throws an exception during construction
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> {
+            throw new IllegalStateException(EXPECTED_EXCEPTION);
+        });
+        tasks.add(() -> future3);
+
+        assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
+
+        // should have canceled the first two, but not the last
+        assertTrue(future1.isCancelled());
+        assertTrue(future2.isCancelled());
+        assertFalse(future3.isCancelled());
     }
 
     @Test
@@ -714,12 +851,14 @@ public class OperationPartialTest {
         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
-        // null outcome
-        final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
-        tasks.add(CompletableFuture.completedFuture(null));
+        // null outcome - takes precedence over a success
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
+        tasks.add(() -> CompletableFuture.completedFuture(null));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertNull(result.get());
 
@@ -727,26 +866,85 @@ public class OperationPartialTest {
         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
         tasks.clear();
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
-        tasks.add(CompletableFuture.failedFuture(except));
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
+        tasks.add(() -> CompletableFuture.failedFuture(except));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
         result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isCompletedExceptionally());
         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
     }
 
-    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    /**
+     * Tests both flavors of sequence(), because one invokes the other.
+     */
+    @Test
+    public void testSequence() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome(null);
+
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> null);
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.sequence(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // second task fails, third should not run
+        OperationOutcome failure = params.makeOutcome(null);
+        failure.setResult(PolicyResult.FAILURE);
+        tasks.clear();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(failure));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(failure, result.get());
+    }
+
+    /**
+     * Tests both flavors of sequence(), for edge cases: zero items, and one item.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testSequenceEdge() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // zero items: check both using a list and using an array
+        assertNull(oper.sequence(tasks));
+        assertNull(oper.sequence());
 
+        // one item: : check both using a list and using an array
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+
+        assertSame(future1, oper.sequence(tasks));
+        assertSame(future1, oper.sequence(() -> future1));
+    }
+
+    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         OperationOutcome expectedOutcome = null;
 
         for (int count = 0; count < results.length; ++count) {
-            OperationOutcome outcome = params.makeOutcome();
+            OperationOutcome outcome = params.makeOutcome(null);
             outcome.setResult(results[count]);
-            tasks.add(CompletableFuture.completedFuture(outcome));
+            tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
             if (count == expected) {
                 expectedOutcome = outcome;
@@ -755,22 +953,16 @@ public class OperationPartialTest {
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(expectedOutcome, result.get());
     }
 
-    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
-                    final OperationOutcome taskOutcome) {
-
-        return outcome -> CompletableFuture.completedFuture(taskOutcome);
-    }
-
     @Test
     public void testDetmPriority() throws CoderException {
         assertEquals(1, oper.detmPriority(null));
 
-        OperationOutcome outcome = params.makeOutcome();
+        OperationOutcome outcome = params.makeOutcome(null);
 
         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
@@ -789,210 +981,6 @@ public class OperationPartialTest {
         assertEquals(1, oper.detmPriority(outcome));
     }
 
-    /**
-     * Tests doTask(Future) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFutureNotRunning() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFutureSuccess() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
-
-        taskFuture.complete(taskOutcome);
-        assertTrue(executor.runAll());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFutureFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-
-        // controller SHOULD be done now
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFutureUncheckedFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-
-        // complete the task
-        OperationOutcome taskOutcome = params.makeOutcome();
-        taskFuture.complete(taskOutcome);
-
-        assertTrue(executor.runAll());
-
-        // should have run the task
-        assertTrue(future.isDone());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFunctionNotRunning() throws Exception {
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFunctionSuccess() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFunctionFailure() throws Exception {
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-
-        // controller should have the failed task
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFunctionUncheckedFailure() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
     /**
      * Tests callbackStarted() when the pipeline has already been stopped.
      */
@@ -1013,7 +1001,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should have only run once
         assertEquals(1, numStart);
@@ -1035,7 +1023,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should not have been set
         assertNull(opend);
@@ -1068,6 +1056,10 @@ public class OperationPartialTest {
         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
+        oper.setOutcome(outcome, PolicyResult.SUCCESS);
+        assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
+        assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+
         for (PolicyResult result : FAILURE_RESULTS) {
             outcome = new OperationOutcome();
             oper.setOutcome(outcome, result);
@@ -1090,6 +1082,62 @@ public class OperationPartialTest {
         assertTrue(oper.isTimeout(new CompletionException(timex)));
     }
 
+    @Test
+    public void testLogMessage() {
+        final String infraStr = SINK_INFRA.toString();
+
+        // log structured data
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        List<String> output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // log a plain string
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
+
+        // log a null request
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
+
+        // generate exception from coder
+        setOperCoderException();
+
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print request");
+        assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print response");
+        assertThat(output.get(1)).contains(MY_SOURCE);
+    }
+
     @Test
     public void testGetRetry() {
         assertEquals(0, oper.getRetry(null));
@@ -1099,10 +1147,20 @@ public class OperationPartialTest {
     @Test
     public void testGetRetryWait() {
         // need an operator that doesn't override the retry time
-        OperationPartial oper2 = new OperationPartial(params, operator) {};
+        OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
     }
 
+    @Test
+    public void testGetTargetEntity() {
+        // get it from the params
+        assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
+
+        // now get it from the properties
+        oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
+        assertEquals("entityX", oper.getTargetEntity());
+    }
+
     @Test
     public void testGetTimeOutMs() {
         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
@@ -1119,11 +1177,13 @@ public class OperationPartialTest {
         ++numStart;
         tstart = oper.getStart();
         opstart = oper;
+        starts.add(oper);
     }
 
     private void completer(OperationOutcome oper) {
         ++numEnd;
         opend = oper;
+        ends.add(oper);
     }
 
     /**
@@ -1138,14 +1198,14 @@ public class OperationPartialTest {
     }
 
     private OperationOutcome makeSuccess() {
-        OperationOutcome outcome = params.makeOutcome();
+        OperationOutcome outcome = params.makeOutcome(null);
         outcome.setResult(PolicyResult.SUCCESS);
 
         return outcome;
     }
 
     private OperationOutcome makeFailure() {
-        OperationOutcome outcome = params.makeOutcome();
+        OperationOutcome outcome = params.makeOutcome(null);
         outcome.setResult(PolicyResult.FAILURE);
 
         return outcome;
@@ -1162,10 +1222,7 @@ public class OperationPartialTest {
     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
                     PolicyResult expectedResult) {
 
-        String expectedSubRequestId =
-                        (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
-
-        verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
+        verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
     }
 
     /**
@@ -1175,19 +1232,24 @@ public class OperationPartialTest {
      * @param expectedCallbacks number of callbacks expected
      * @param expectedOperations number of operation invocations expected
      * @param expectedResult expected outcome
-     * @param expectedSubRequestId expected sub request ID
      * @param manipulator function to modify the future returned by
      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
      *        in the executor are run
      */
     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
-                    String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
+                    Consumer<CompletableFuture<OperationOutcome>> manipulator) {
+
+        tstart = null;
+        opstart = null;
+        opend = null;
+        starts.clear();
+        ends.clear();
 
         CompletableFuture<OperationOutcome> future = oper.start();
 
         manipulator.accept(future);
 
-        assertTrue(testName, executor.runAll());
+        assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
         assertEquals(testName, expectedCallbacks, numStart);
         assertEquals(testName, expectedCallbacks, numEnd);
@@ -1202,36 +1264,72 @@ public class OperationPartialTest {
 
             try {
                 assertTrue(future.isDone());
-                assertSame(testName, opend, future.get());
+                assertEquals(testName, opend, future.get());
+
+                // "start" is never final
+                for (OperationOutcome outcome : starts) {
+                    assertFalse(testName, outcome.isFinalOutcome());
+                }
+
+                // only the last "complete" is final
+                assertTrue(testName, ends.removeLast().isFinalOutcome());
+
+                for (OperationOutcome outcome : ends) {
+                    assertFalse(outcome.isFinalOutcome());
+                }
 
             } catch (InterruptedException | ExecutionException e) {
                 throw new IllegalStateException(e);
             }
 
             if (expectedOperations > 0) {
-                assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
+                assertNotNull(testName, oper.getSubRequestId());
+                assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
+                assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
             }
         }
 
         assertEquals(testName, expectedOperations, oper.getCount());
     }
 
+    /**
+     * Creates a new {@link #oper} whose coder will throw an exception.
+     */
+    private void setOperCoderException() {
+        oper = new MyOper() {
+            @Override
+            protected Coder getCoder() {
+                return new StandardCoder() {
+                    @Override
+                    public String encode(Object object, boolean pretty) throws CoderException {
+                        throw new CoderException(EXPECTED_EXCEPTION);
+                    }
+                };
+            }
+        };
+    }
+
+
+    @Getter
+    public static class MyData {
+        private String text = TEXT;
+    }
+
+
     private class MyOper extends OperationPartial {
         @Getter
         private int count = 0;
 
         @Setter
         private boolean genException;
-
         @Setter
         private int maxFailures = 0;
-
         @Setter
-        private CompletableFuture<OperationOutcome> guard;
+        private CompletableFuture<OperationOutcome> preProc;
 
 
         public MyOper() {
-            super(OperationPartialTest.this.params, operator);
+            super(OperationPartialTest.this.params, config, PROP_NAMES);
         }
 
         @Override
@@ -1252,11 +1350,6 @@ public class OperationPartialTest {
             return operation;
         }
 
-        @Override
-        protected CompletableFuture<OperationOutcome> startGuardAsync() {
-            return (guard != null ? guard : super.startGuardAsync());
-        }
-
         @Override
         protected long getRetryWaitMs() {
             /*
@@ -1265,5 +1358,10 @@ public class OperationPartialTest {
              */
             return 0L;
         }
+
+        @Override
+        protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+            return (preProc != null ? preProc : super.startPreprocessorAsync());
+        }
     }
 }