package org.onap.policy.controlloop.actorserviceprovider.impl;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import ch.qos.logback.classic.Logger;
import java.time.Instant;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.VirtualControlLoopEvent;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.LoggerFactory;
public class OperationPartialTest {
- private static final int MAX_PARALLEL_REQUESTS = 10;
+ private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+ private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
+ private static final int MAX_REQUESTS = 100;
+ private static final int MAX_PARALLEL = 10;
private static final String EXPECTED_EXCEPTION = "expected exception";
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
- private static final String TARGET = "my-target";
+ private static final String MY_SINK = "my-sink";
+ private static final String MY_SOURCE = "my-source";
+ private static final String TEXT = "my-text";
private static final int TIMEOUT = 1000;
private static final UUID REQ_ID = UUID.randomUUID();
private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
.filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
private VirtualControlLoopEvent event;
private ControlLoopEventContext context;
- private MyExec executor;
+ private PseudoExecutor executor;
private ControlLoopOperationParams params;
private MyOper oper;
private OperatorPartial operator;
+ /**
+ * Attaches the appender to the logger.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ /**
+ * Attach appender to the logger.
+ */
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+
+ logger.addAppender(appender);
+ }
+
+ /**
+ * Stops the appender.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() {
+ appender.stop();
+ }
+
/**
* Initializes the fields, including {@link #oper}.
*/
event.setRequestId(REQ_ID);
context = new ControlLoopEventContext(event);
- executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
+ executor = new PseudoExecutor();
params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
.executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
- .startCallback(this::starter).targetEntity(TARGET).build();
+ .startCallback(this::starter).targetEntity(MY_SINK).build();
operator = new OperatorPartial(ACTOR, OPERATION) {
@Override
*/
@Test
public void testStartMultiple() {
- for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+ for (int count = 0; count < MAX_PARALLEL; ++count) {
oper.start();
}
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
assertNotNull(opstart);
assertNotNull(opend);
assertEquals(PolicyResult.SUCCESS, opend.getResult());
- assertEquals(MAX_PARALLEL_REQUESTS, numStart);
- assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
- assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
+ assertEquals(MAX_PARALLEL, numStart);
+ assertEquals(MAX_PARALLEL, oper.getCount());
+ assertEquals(MAX_PARALLEL, numEnd);
}
/**
oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
oper.start().cancel(false);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertNull(opstart);
assertNull(opend);
@Test
public void testStartOperationAsync() {
oper.start();
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertEquals(1, oper.getCount());
}
outcome.setResult(PolicyResult.FAILURE);
// incorrect actor
- outcome.setActor(TARGET);
+ outcome.setActor(MY_SINK);
assertFalse(oper.isActorFailed(outcome));
outcome.setActor(null);
assertFalse(oper.isActorFailed(outcome));
outcome.setActor(ACTOR);
// incorrect operation
- outcome.setOperation(TARGET);
+ outcome.setOperation(MY_SINK);
assertFalse(oper.isActorFailed(outcome));
outcome.setOperation(null);
assertFalse(oper.isActorFailed(outcome));
OperationPartial oper2 = new OperationPartial(params, operator) {};
oper2.start();
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertNotNull(opend);
assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
// wrong actor - should be false
outcome.setActor(null);
assertFalse(oper.isSameOperation(outcome));
- outcome.setActor(TARGET);
+ outcome.setActor(MY_SINK);
assertFalse(oper.isSameOperation(outcome));
outcome.setActor(ACTOR);
// wrong operation - should be null
outcome.setOperation(null);
assertFalse(oper.isSameOperation(outcome));
- outcome.setOperation(TARGET);
+ outcome.setOperation(MY_SINK);
assertFalse(oper.isSameOperation(outcome));
outcome.setOperation(OPERATION);
@Test
public void testAnyOf() throws Exception {
// first task completes, others do not
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
final OperationOutcome outcome = params.makeOutcome();
- tasks.add(CompletableFuture.completedFuture(outcome));
- tasks.add(new CompletableFuture<>());
- tasks.add(new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> null);
+ tasks.add(() -> new CompletableFuture<>());
CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ // repeat using array form
+ @SuppressWarnings("unchecked")
+ Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+ result = oper.anyOf(tasks.toArray(taskArray));
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
// second task completes, others do not
- tasks = new LinkedList<>();
-
- tasks.add(new CompletableFuture<>());
- tasks.add(CompletableFuture.completedFuture(outcome));
- tasks.add(new CompletableFuture<>());
+ tasks.clear();
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> new CompletableFuture<>());
result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
-
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
// third task completes, others do not
- tasks = new LinkedList<>();
-
- tasks.add(new CompletableFuture<>());
- tasks.add(new CompletableFuture<>());
- tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.clear();
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
-
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
}
@Test
@SuppressWarnings("unchecked")
public void testAnyOfEdge() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
// zero items: check both using a list and using an array
- assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
- assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
+ assertNull(oper.anyOf(tasks));
+ assertNull(oper.anyOf());
// one item: : check both using a list and using an array
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
- tasks.add(future1);
+ tasks.add(() -> future1);
assertSame(future1, oper.anyOf(tasks));
- assertSame(future1, oper.anyOf(future1));
+ assertSame(future1, oper.anyOf(() -> future1));
}
- /**
- * Tests both flavors of allOf(), because one invokes the other.
- */
@Test
- public void testAllOf() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ public void testAllOfArray() throws Exception {
+ final OperationOutcome outcome = params.makeOutcome();
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result =
+ oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
+
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future1.complete(outcome);
+
+ // complete 3 before 2
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future3.complete(outcome);
+
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future2.complete(outcome);
+
+ // all of them are now done
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ }
+
+ @Test
+ public void testAllOfList() throws Exception {
final OperationOutcome outcome = params.makeOutcome();
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
- tasks.add(future1);
- tasks.add(future2);
- tasks.add(future3);
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> future1);
+ tasks.add(() -> future2);
+ tasks.add(() -> null);
+ tasks.add(() -> future3);
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future1.complete(outcome);
// complete 3 before 2
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future3.complete(outcome);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future2.complete(outcome);
// all of them are now done
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
}
@Test
@SuppressWarnings("unchecked")
public void testAllOfEdge() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
// zero items: check both using a list and using an array
- assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
- assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
+ assertNull(oper.allOf(tasks));
+ assertNull(oper.allOf());
// one item: : check both using a list and using an array
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
- tasks.add(future1);
+ tasks.add(() -> future1);
assertSame(future1, oper.allOf(tasks));
- assertSame(future1, oper.allOf(future1));
+ assertSame(future1, oper.allOf(() -> future1));
+ }
+
+ @Test
+ public void testAttachFutures() throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+ // third task throws an exception during construction
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+ tasks.add(() -> future1);
+ tasks.add(() -> future2);
+ tasks.add(() -> {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+ tasks.add(() -> future3);
+
+ assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
+
+ // should have canceled the first two, but not the last
+ assertTrue(future1.isCancelled());
+ assertTrue(future2.isCancelled());
+ assertFalse(future3.isCancelled());
}
@Test
verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
- // null outcome
- final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
- tasks.add(CompletableFuture.completedFuture(null));
+ // null outcome - takes precedence over a success
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+ tasks.add(() -> CompletableFuture.completedFuture(null));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertNull(result.get());
IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
tasks.clear();
- tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
- tasks.add(CompletableFuture.failedFuture(except));
- tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
+ tasks.add(() -> CompletableFuture.failedFuture(except));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isCompletedExceptionally());
result.whenComplete((unused, thrown) -> assertSame(except, thrown));
}
- private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ /**
+ * Tests both flavors of sequence(), because one invokes the other.
+ */
+ @Test
+ public void testSequence() throws Exception {
+ final OperationOutcome outcome = params.makeOutcome();
+
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> null);
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+ CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // repeat using array form
+ @SuppressWarnings("unchecked")
+ Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+ result = oper.sequence(tasks.toArray(taskArray));
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // second task fails, third should not run
+ OperationOutcome failure = params.makeOutcome();
+ failure.setResult(PolicyResult.FAILURE);
+ tasks.clear();
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(failure));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+ result = oper.sequence(tasks);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(failure, result.get());
+ }
+
+ /**
+ * Tests both flavors of sequence(), for edge cases: zero items, and one item.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSequenceEdge() throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+ // zero items: check both using a list and using an array
+ assertNull(oper.sequence(tasks));
+ assertNull(oper.sequence());
+ // one item: : check both using a list and using an array
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ tasks.add(() -> future1);
+
+ assertSame(future1, oper.sequence(tasks));
+ assertSame(future1, oper.sequence(() -> future1));
+ }
+
+ private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
OperationOutcome expectedOutcome = null;
for (int count = 0; count < results.length; ++count) {
OperationOutcome outcome = params.makeOutcome();
outcome.setResult(results[count]);
- tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
if (count == expected) {
expectedOutcome = outcome;
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(expectedOutcome, result.get());
}
- private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
- final OperationOutcome taskOutcome) {
-
- return outcome -> CompletableFuture.completedFuture(taskOutcome);
- }
-
@Test
public void testDetmPriority() throws CoderException {
assertEquals(1, oper.detmPriority(null));
assertEquals(1, oper.detmPriority(outcome));
}
- /**
- * Tests doTask(Future) when the controller is not running.
- */
- @Test
- public void testDoTaskFutureNotRunning() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- controller.complete(params.makeOutcome());
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should have canceled the task future
- assertTrue(taskFuture.isCancelled());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was successful.
- */
- @Test
- public void testDoTaskFutureSuccess() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
-
- taskFuture.complete(taskOutcome);
- assertTrue(executor.runAll());
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was failed.
- */
- @Test
- public void testDoTaskFutureFailure() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should have canceled the task future
- assertTrue(taskFuture.isCancelled());
-
- // controller SHOULD be done now
- assertTrue(controller.isDone());
- assertSame(failedOutcome, controller.get());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was failed, but not checking
- * success.
- */
- @Test
- public void testDoTaskFutureUncheckedFailure() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
- assertFalse(future.isDone());
-
- // complete the task
- OperationOutcome taskOutcome = params.makeOutcome();
- taskFuture.complete(taskOutcome);
-
- assertTrue(executor.runAll());
-
- // should have run the task
- assertTrue(future.isDone());
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Function) when the controller is not running.
- */
- @Test
- public void testDoTaskFunctionNotRunning() throws Exception {
- AtomicBoolean invoked = new AtomicBoolean();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
- invoked.set(true);
- return CompletableFuture.completedFuture(params.makeOutcome());
- };
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- controller.complete(params.makeOutcome());
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should not have even invoked the task
- assertFalse(invoked.get());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was successful.
- */
- @Test
- public void testDoTaskFunctionSuccess() throws Exception {
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- final OperationOutcome failedOutcome = params.makeOutcome();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was failed.
- */
- @Test
- public void testDoTaskFunctionFailure() throws Exception {
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- AtomicBoolean invoked = new AtomicBoolean();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
- invoked.set(true);
- return CompletableFuture.completedFuture(params.makeOutcome());
- };
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should not have even invoked the task
- assertFalse(invoked.get());
-
- // controller should have the failed task
- assertTrue(controller.isDone());
- assertSame(failedOutcome, controller.get());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was failed, but not checking
- * success.
- */
- @Test
- public void testDoTaskFunctionUncheckedFailure() throws Exception {
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
/**
* Tests callbackStarted() when the pipeline has already been stopped.
*/
oper = new MyOper();
future.set(oper.start());
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
// should have only run once
assertEquals(1, numStart);
oper = new MyOper();
future.set(oper.start());
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
// should not have been set
assertNull(opend);
assertTrue(oper.isTimeout(new CompletionException(timex)));
}
+ @Test
+ public void testLogMessage() {
+ final String infraStr = SINK_INFRA.toString();
+
+ // log structured data
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+ List<String> output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
+ .contains("{\n \"text\": \"my-text\"\n}");
+
+ // repeat with a response
+ appender.clearExtractions();
+ oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
+ .contains("{\n \"text\": \"my-text\"\n}");
+
+ // log a plain string
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
+
+ // log a null request
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
+
+ // generate exception from coder
+ setOperCoderException();
+
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print request");
+ assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
+
+ // repeat with a response
+ appender.clearExtractions();
+ oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print response");
+ assertThat(output.get(1)).contains(MY_SOURCE);
+ }
+
@Test
public void testGetRetry() {
assertEquals(0, oper.getRetry(null));
manipulator.accept(future);
- assertTrue(testName, executor.runAll());
+ assertTrue(testName, executor.runAll(MAX_REQUESTS));
assertEquals(testName, expectedCallbacks, numStart);
assertEquals(testName, expectedCallbacks, numEnd);
assertEquals(testName, expectedOperations, oper.getCount());
}
+ /**
+ * Creates a new {@link #oper} whose coder will throw an exception.
+ */
+ private void setOperCoderException() {
+ oper = new MyOper() {
+ @Override
+ protected Coder makeCoder() {
+ return new StandardCoder() {
+ @Override
+ public String encode(Object object, boolean pretty) throws CoderException {
+ throw new CoderException(EXPECTED_EXCEPTION);
+ }
+ };
+ }
+ };
+ }
+
+
+ @Getter
+ public static class MyData {
+ private String text = TEXT;
+ }
+
+
private class MyOper extends OperationPartial {
@Getter
private int count = 0;