Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
index f28c1f6..67ac27c 100644 (file)
@@ -20,8 +20,8 @@
 
 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import ch.qos.logback.classic.Logger;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.LinkedList;
@@ -45,42 +46,59 @@ import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.Getter;
 import lombok.Setter;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
 import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
 import org.onap.policy.controlloop.ControlLoopOperation;
 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.LoggerFactory;
 
 public class OperationPartialTest {
-    private static final int MAX_PARALLEL_REQUESTS = 10;
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+    private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
+    private static final int MAX_REQUESTS = 100;
+    private static final int MAX_PARALLEL = 10;
     private static final String EXPECTED_EXCEPTION = "expected exception";
     private static final String ACTOR = "my-actor";
     private static final String OPERATION = "my-operation";
-    private static final String TARGET = "my-target";
+    private static final String MY_SINK = "my-sink";
+    private static final String MY_SOURCE = "my-source";
+    private static final String TEXT = "my-text";
     private static final int TIMEOUT = 1000;
     private static final UUID REQ_ID = UUID.randomUUID();
 
     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
+    /**
+     * Used to attach an appender to the class' logger.
+     */
+    private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
+    private static final ExtractAppender appender = new ExtractAppender();
+
     private VirtualControlLoopEvent event;
     private ControlLoopEventContext context;
-    private MyExec executor;
+    private PseudoExecutor executor;
     private ControlLoopOperationParams params;
 
     private MyOper oper;
@@ -95,6 +113,28 @@ public class OperationPartialTest {
 
     private OperatorPartial operator;
 
+    /**
+     * Attaches the appender to the logger.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        /**
+         * Attach appender to the logger.
+         */
+        appender.setContext(logger.getLoggerContext());
+        appender.start();
+
+        logger.addAppender(appender);
+    }
+
+    /**
+     * Stops the appender.
+     */
+    @AfterClass
+    public static void tearDownAfterClass() {
+        appender.stop();
+    }
+
     /**
      * Initializes the fields, including {@link #oper}.
      */
@@ -104,11 +144,11 @@ public class OperationPartialTest {
         event.setRequestId(REQ_ID);
 
         context = new ControlLoopEventContext(event);
-        executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
+        executor = new PseudoExecutor();
 
         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
-                        .startCallback(this::starter).targetEntity(TARGET).build();
+                        .startCallback(this::starter).targetEntity(MY_SINK).build();
 
         operator = new OperatorPartial(ACTOR, OPERATION) {
             @Override
@@ -209,19 +249,19 @@ public class OperationPartialTest {
      */
     @Test
     public void testStartMultiple() {
-        for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+        for (int count = 0; count < MAX_PARALLEL; ++count) {
             oper.start();
         }
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
         assertNotNull(opstart);
         assertNotNull(opend);
         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
-        assertEquals(MAX_PARALLEL_REQUESTS, numStart);
-        assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
-        assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
+        assertEquals(MAX_PARALLEL, numStart);
+        assertEquals(MAX_PARALLEL, oper.getCount());
+        assertEquals(MAX_PARALLEL, numEnd);
     }
 
     /**
@@ -254,7 +294,7 @@ public class OperationPartialTest {
         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
         oper.start().cancel(false);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNull(opstart);
         assertNull(opend);
@@ -295,7 +335,7 @@ public class OperationPartialTest {
     @Test
     public void testStartOperationAsync() {
         oper.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertEquals(1, oper.getCount());
     }
@@ -330,14 +370,14 @@ public class OperationPartialTest {
         outcome.setResult(PolicyResult.FAILURE);
 
         // incorrect actor
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(null);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setActor(ACTOR);
 
         // incorrect operation
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isActorFailed(outcome));
         outcome.setOperation(null);
         assertFalse(oper.isActorFailed(outcome));
@@ -355,7 +395,7 @@ public class OperationPartialTest {
         OperationPartial oper2 = new OperationPartial(params, operator) {};
 
         oper2.start();
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         assertNotNull(opend);
         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
@@ -519,14 +559,14 @@ public class OperationPartialTest {
         // wrong actor - should be false
         outcome.setActor(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setActor(TARGET);
+        outcome.setActor(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setActor(ACTOR);
 
         // wrong operation - should be null
         outcome.setOperation(null);
         assertFalse(oper.isSameOperation(outcome));
-        outcome.setOperation(TARGET);
+        outcome.setOperation(MY_SINK);
         assertFalse(oper.isSameOperation(outcome));
         outcome.setOperation(OPERATION);
 
@@ -584,43 +624,47 @@ public class OperationPartialTest {
     @Test
     public void testAnyOf() throws Exception {
         // first task completes, others do not
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         final OperationOutcome outcome = params.makeOutcome();
 
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> null);
+        tasks.add(() -> new CompletableFuture<>());
 
         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
 
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.anyOf(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // second task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
-        tasks.add(new CompletableFuture<>());
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> new CompletableFuture<>());
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
 
         // third task completes, others do not
-        tasks = new LinkedList<>();
-
-        tasks.add(new CompletableFuture<>());
-        tasks.add(new CompletableFuture<>());
-        tasks.add(CompletableFuture.completedFuture(outcome));
+        tasks.clear();
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> new CompletableFuture<>());
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
         result = oper.anyOf(tasks);
-        assertTrue(executor.runAll());
-
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -631,54 +675,82 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAnyOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
+        assertNull(oper.anyOf(tasks));
+        assertNull(oper.anyOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.anyOf(tasks));
-        assertSame(future1, oper.anyOf(future1));
+        assertSame(future1, oper.anyOf(() -> future1));
     }
 
-    /**
-     * Tests both flavors of allOf(), because one invokes the other.
-     */
     @Test
-    public void testAllOf() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    public void testAllOfArray() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome();
 
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+
+        @SuppressWarnings("unchecked")
+        CompletableFuture<OperationOutcome> result =
+                        oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future1.complete(outcome);
+
+        // complete 3 before 2
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future3.complete(outcome);
+
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertFalse(result.isDone());
+        future2.complete(outcome);
+
+        // all of them are now done
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+    }
+
+    @Test
+    public void testAllOfList() throws Exception {
         final OperationOutcome outcome = params.makeOutcome();
 
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
-        tasks.add(future1);
-        tasks.add(future2);
-        tasks.add(future3);
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> null);
+        tasks.add(() -> future3);
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future1.complete(outcome);
 
         // complete 3 before 2
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future3.complete(outcome);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertFalse(result.isDone());
         future2.complete(outcome);
 
         // all of them are now done
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(outcome, result.get());
     }
@@ -689,18 +761,41 @@ public class OperationPartialTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAllOfEdge() throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         // zero items: check both using a list and using an array
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
-        assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
+        assertNull(oper.allOf(tasks));
+        assertNull(oper.allOf());
 
         // one item: : check both using a list and using an array
         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
-        tasks.add(future1);
+        tasks.add(() -> future1);
 
         assertSame(future1, oper.allOf(tasks));
-        assertSame(future1, oper.allOf(future1));
+        assertSame(future1, oper.allOf(() -> future1));
+    }
+
+    @Test
+    public void testAttachFutures() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // third task throws an exception during construction
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+        CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+        tasks.add(() -> future2);
+        tasks.add(() -> {
+            throw new IllegalStateException(EXPECTED_EXCEPTION);
+        });
+        tasks.add(() -> future3);
+
+        assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
+
+        // should have canceled the first two, but not the last
+        assertTrue(future1.isCancelled());
+        assertTrue(future2.isCancelled());
+        assertFalse(future3.isCancelled());
     }
 
     @Test
@@ -714,12 +809,14 @@ public class OperationPartialTest {
         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
-        // null outcome
-        final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
-        tasks.add(CompletableFuture.completedFuture(null));
+        // null outcome - takes precedence over a success
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.completedFuture(null));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertNull(result.get());
 
@@ -727,26 +824,85 @@ public class OperationPartialTest {
         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
         tasks.clear();
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
-        tasks.add(CompletableFuture.failedFuture(except));
-        tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+        tasks.add(() -> CompletableFuture.failedFuture(except));
+        tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
         result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isCompletedExceptionally());
         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
     }
 
-    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
-        List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+    /**
+     * Tests both flavors of sequence(), because one invokes the other.
+     */
+    @Test
+    public void testSequence() throws Exception {
+        final OperationOutcome outcome = params.makeOutcome();
+
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> null);
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // repeat using array form
+        @SuppressWarnings("unchecked")
+        Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+        result = oper.sequence(tasks.toArray(taskArray));
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(outcome, result.get());
+
+        // second task fails, third should not run
+        OperationOutcome failure = params.makeOutcome();
+        failure.setResult(PolicyResult.FAILURE);
+        tasks.clear();
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+        tasks.add(() -> CompletableFuture.completedFuture(failure));
+        tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+        result = oper.sequence(tasks);
+        assertTrue(executor.runAll(MAX_REQUESTS));
+        assertTrue(result.isDone());
+        assertSame(failure, result.get());
+    }
+
+    /**
+     * Tests both flavors of sequence(), for edge cases: zero items, and one item.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testSequenceEdge() throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+        // zero items: check both using a list and using an array
+        assertNull(oper.sequence(tasks));
+        assertNull(oper.sequence());
 
+        // one item: : check both using a list and using an array
+        CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+        tasks.add(() -> future1);
+
+        assertSame(future1, oper.sequence(tasks));
+        assertSame(future1, oper.sequence(() -> future1));
+    }
+
+    private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
+        List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
         OperationOutcome expectedOutcome = null;
 
         for (int count = 0; count < results.length; ++count) {
             OperationOutcome outcome = params.makeOutcome();
             outcome.setResult(results[count]);
-            tasks.add(CompletableFuture.completedFuture(outcome));
+            tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
             if (count == expected) {
                 expectedOutcome = outcome;
@@ -755,17 +911,11 @@ public class OperationPartialTest {
 
         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
         assertTrue(result.isDone());
         assertSame(expectedOutcome, result.get());
     }
 
-    private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
-                    final OperationOutcome taskOutcome) {
-
-        return outcome -> CompletableFuture.completedFuture(taskOutcome);
-    }
-
     @Test
     public void testDetmPriority() throws CoderException {
         assertEquals(1, oper.detmPriority(null));
@@ -789,210 +939,6 @@ public class OperationPartialTest {
         assertEquals(1, oper.detmPriority(outcome));
     }
 
-    /**
-     * Tests doTask(Future) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFutureNotRunning() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFutureSuccess() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
-
-        taskFuture.complete(taskOutcome);
-        assertTrue(executor.runAll());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFutureFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should have canceled the task future
-        assertTrue(taskFuture.isCancelled());
-
-        // controller SHOULD be done now
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Future) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFutureUncheckedFailure() throws Exception {
-        CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
-        assertFalse(future.isDone());
-
-        // complete the task
-        OperationOutcome taskOutcome = params.makeOutcome();
-        taskFuture.complete(taskOutcome);
-
-        assertTrue(executor.runAll());
-
-        // should have run the task
-        assertTrue(future.isDone());
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the controller is not running.
-     */
-    @Test
-    public void testDoTaskFunctionNotRunning() throws Exception {
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-        controller.complete(params.makeOutcome());
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was successful.
-     */
-    @Test
-    public void testDoTaskFunctionSuccess() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed.
-     */
-    @Test
-    public void testDoTaskFunctionFailure() throws Exception {
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        AtomicBoolean invoked = new AtomicBoolean();
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
-            invoked.set(true);
-            return CompletableFuture.completedFuture(params.makeOutcome());
-        };
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-        assertFalse(future.isDone());
-        assertTrue(executor.runAll());
-
-        // should not have run the task
-        assertFalse(future.isDone());
-
-        // should not have even invoked the task
-        assertFalse(invoked.get());
-
-        // controller should have the failed task
-        assertTrue(controller.isDone());
-        assertSame(failedOutcome, controller.get());
-    }
-
-    /**
-     * Tests doTask(Function) when the previous outcome was failed, but not checking
-     * success.
-     */
-    @Test
-    public void testDoTaskFunctionUncheckedFailure() throws Exception {
-        final OperationOutcome taskOutcome = params.makeOutcome();
-
-        final OperationOutcome failedOutcome = params.makeOutcome();
-        failedOutcome.setResult(PolicyResult.FAILURE);
-
-        Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
-        PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
-        CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
-
-        assertTrue(future.isDone());
-        assertSame(taskOutcome, future.get());
-
-        // controller should not be done yet
-        assertFalse(controller.isDone());
-    }
-
     /**
      * Tests callbackStarted() when the pipeline has already been stopped.
      */
@@ -1013,7 +959,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should have only run once
         assertEquals(1, numStart);
@@ -1035,7 +981,7 @@ public class OperationPartialTest {
         oper = new MyOper();
 
         future.set(oper.start());
-        assertTrue(executor.runAll());
+        assertTrue(executor.runAll(MAX_REQUESTS));
 
         // should not have been set
         assertNull(opend);
@@ -1090,6 +1036,62 @@ public class OperationPartialTest {
         assertTrue(oper.isTimeout(new CompletionException(timex)));
     }
 
+    @Test
+    public void testLogMessage() {
+        final String infraStr = SINK_INFRA.toString();
+
+        // log structured data
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        List<String> output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
+                        .contains("{\n  \"text\": \"my-text\"\n}");
+
+        // log a plain string
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
+
+        // log a null request
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
+        output = appender.getExtracted();
+        assertEquals(1, output.size());
+
+        assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
+
+        // generate exception from coder
+        setOperCoderException();
+
+        appender.clearExtractions();
+        oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print request");
+        assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
+
+        // repeat with a response
+        appender.clearExtractions();
+        oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+        output = appender.getExtracted();
+        assertEquals(2, output.size());
+        assertThat(output.get(0)).contains("cannot pretty-print response");
+        assertThat(output.get(1)).contains(MY_SOURCE);
+    }
+
     @Test
     public void testGetRetry() {
         assertEquals(0, oper.getRetry(null));
@@ -1187,7 +1189,7 @@ public class OperationPartialTest {
 
         manipulator.accept(future);
 
-        assertTrue(testName, executor.runAll());
+        assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
         assertEquals(testName, expectedCallbacks, numStart);
         assertEquals(testName, expectedCallbacks, numEnd);
@@ -1216,6 +1218,30 @@ public class OperationPartialTest {
         assertEquals(testName, expectedOperations, oper.getCount());
     }
 
+    /**
+     * Creates a new {@link #oper} whose coder will throw an exception.
+     */
+    private void setOperCoderException() {
+        oper = new MyOper() {
+            @Override
+            protected Coder makeCoder() {
+                return new StandardCoder() {
+                    @Override
+                    public String encode(Object object, boolean pretty) throws CoderException {
+                        throw new CoderException(EXPECTED_EXCEPTION);
+                    }
+                };
+            }
+        };
+    }
+
+
+    @Getter
+    public static class MyData {
+        private String text = TEXT;
+    }
+
+
     private class MyOper extends OperationPartial {
         @Getter
         private int count = 0;