package org.onap.policy.controlloop.actorserviceprovider.impl;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import ch.qos.logback.classic.Logger;
import java.time.Instant;
+import java.util.ArrayDeque;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
+import org.onap.policy.common.utils.time.PseudoExecutor;
import org.onap.policy.controlloop.ControlLoopOperation;
-import org.onap.policy.controlloop.VirtualControlLoopEvent;
+import org.onap.policy.controlloop.actorserviceprovider.ActorService;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operator;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
-import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
+import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
+import org.slf4j.LoggerFactory;
public class OperationPartialTest {
- private static final int MAX_PARALLEL_REQUESTS = 10;
+ private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
+ private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
+ private static final int MAX_REQUESTS = 100;
+ private static final int MAX_PARALLEL = 10;
private static final String EXPECTED_EXCEPTION = "expected exception";
private static final String ACTOR = "my-actor";
private static final String OPERATION = "my-operation";
- private static final String TARGET = "my-target";
+ private static final String MY_SINK = "my-sink";
+ private static final String MY_SOURCE = "my-source";
+ private static final String MY_TARGET_ENTITY = "my-entity";
+ private static final String TEXT = "my-text";
private static final int TIMEOUT = 1000;
private static final UUID REQ_ID = UUID.randomUUID();
- private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
- .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
+ private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
+ .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
- private VirtualControlLoopEvent event;
- private ControlLoopEventContext context;
- private MyExec executor;
+ /**
+ * Used to attach an appender to the class' logger.
+ */
+ private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
+ private static final ExtractAppender appender = new ExtractAppender();
+
+ private static final List<String> PROP_NAMES = List.of("hello", "world");
+
+ @Mock
+ private ActorService service;
+ @Mock
+ private Actor guardActor;
+ @Mock
+ private Operator guardOperator;
+ @Mock
+ private Operation guardOperation;
+
+ private PseudoExecutor executor;
private ControlLoopOperationParams params;
private MyOper oper;
private OperationOutcome opstart;
private OperationOutcome opend;
- private OperatorPartial operator;
+ private Deque<OperationOutcome> starts;
+ private Deque<OperationOutcome> ends;
+
+ private OperatorConfig config;
+
+ /**
+ * Attaches the appender to the logger.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ /*
+ * Attach appender to the logger.
+ */
+ appender.setContext(logger.getLoggerContext());
+ appender.start();
+
+ logger.addAppender(appender);
+ }
+
+ /**
+ * Stops the appender.
+ */
+ @AfterClass
+ public static void tearDownAfterClass() {
+ appender.stop();
+ }
/**
* Initializes the fields, including {@link #oper}.
*/
@Before
public void setUp() {
- event = new VirtualControlLoopEvent();
- event.setRequestId(REQ_ID);
+ MockitoAnnotations.initMocks(this);
+ executor = new PseudoExecutor();
- context = new ControlLoopEventContext(event);
- executor = new MyExec();
+ params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
+ .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
+ .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
- params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
- .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
- .startCallback(this::starter).targetEntity(TARGET).build();
+ when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
+ when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
+ when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
+ when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
- operator = new OperatorPartial(ACTOR, OPERATION) {
- @Override
- public Executor getBlockingExecutor() {
- return executor;
- }
-
- @Override
- public Operation buildOperation(ControlLoopOperationParams params) {
- return null;
- }
- };
-
- operator.configure(null);
- operator.start();
+ config = new OperatorConfig(executor);
oper = new MyOper();
opstart = null;
opend = null;
+
+ starts = new ArrayDeque<>(10);
+ ends = new ArrayDeque<>(10);
}
@Test
assertNull(future.get(5, TimeUnit.SECONDS));
}
- /**
- * Exercises the doXxx() methods.
- */
@Test
- public void testDoXxx() {
- assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
- assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
- assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
- assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
-
+ public void testGetPropertyNames() {
+ assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
}
@Test
- public void testStart() {
- verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
- }
+ public void testGetProperty_testSetProperty_testGetRequiredProperty() {
+ oper.setProperty("propertyA", "valueA");
+ oper.setProperty("propertyB", "valueB");
+ oper.setProperty("propertyC", 20);
+ oper.setProperty("propertyD", "valueD");
- /**
- * Tests startOperation() when the operator is not running.
- */
- @Test
- public void testStartNotRunning() {
- // stop the operator
- operator.stop();
+ assertEquals("valueA", oper.getProperty("propertyA"));
+ assertEquals("valueB", oper.getProperty("propertyB"));
+ assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
+
+ assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
- assertThatIllegalStateException().isThrownBy(() -> oper.start());
+ assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
+ .withMessage("missing some type");
}
- /**
- * Tests startOperation() when the operation has a preprocessor.
- */
@Test
- public void testStartWithPreprocessor() {
- AtomicInteger count = new AtomicInteger();
-
- CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
- count.incrementAndGet();
- return makeSuccess();
- }, executor);
-
- oper.setGuard(preproc);
-
- verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
-
- assertEquals(1, count.get());
+ public void testStart() {
+ verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
}
/**
*/
@Test
public void testStartMultiple() {
- for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
+ for (int count = 0; count < MAX_PARALLEL; ++count) {
oper.start();
}
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
assertNotNull(opstart);
assertNotNull(opend);
- assertEquals(PolicyResult.SUCCESS, opend.getResult());
-
- assertEquals(MAX_PARALLEL_REQUESTS, numStart);
- assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
- assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
- }
-
- /**
- * Tests startPreprocessor() when the preprocessor returns a failure.
- */
- @Test
- public void testStartPreprocessorFailure() {
- oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
-
- verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
- }
-
- /**
- * Tests startPreprocessor() when the preprocessor throws an exception.
- */
- @Test
- public void testStartPreprocessorException() {
- // arrange for the preprocessor to throw an exception
- oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
-
- verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
- }
-
- /**
- * Tests startPreprocessor() when the pipeline is not running.
- */
- @Test
- public void testStartPreprocessorNotRunning() {
- // arrange for the preprocessor to return success, which will be ignored
- oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
-
- oper.start().cancel(false);
- assertTrue(executor.runAll());
-
- assertNull(opstart);
- assertNull(opend);
-
- assertEquals(0, numStart);
- assertEquals(0, oper.getCount());
- assertEquals(0, numEnd);
- }
-
- /**
- * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
- */
- @Test
- public void testStartPreprocessorBuilderException() {
- oper = new MyOper() {
- @Override
- protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
- throw new IllegalStateException(EXPECTED_EXCEPTION);
- }
- };
-
- assertThatIllegalStateException().isThrownBy(() -> oper.start());
-
- // should be nothing in the queue
- assertEquals(0, executor.getQueueLength());
- }
-
- @Test
- public void testStartPreprocessorAsync() {
- assertNull(oper.startPreprocessorAsync());
- }
+ assertEquals(OperationResult.SUCCESS, opend.getResult());
- @Test
- public void testStartGuardAsync() {
- assertNull(oper.startGuardAsync());
+ assertEquals(MAX_PARALLEL, numStart);
+ assertEquals(MAX_PARALLEL, oper.getCount());
+ assertEquals(MAX_PARALLEL, numEnd);
}
@Test
public void testStartOperationAsync() {
oper.start();
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertEquals(1, oper.getCount());
}
@Test
public void testIsSuccess() {
+ assertFalse(oper.isSuccess(null));
+
OperationOutcome outcome = new OperationOutcome();
- outcome.setResult(PolicyResult.SUCCESS);
+ outcome.setResult(OperationResult.SUCCESS);
assertTrue(oper.isSuccess(outcome));
- for (PolicyResult failure : FAILURE_RESULTS) {
+ for (OperationResult failure : FAILURE_RESULTS) {
outcome.setResult(failure);
assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
}
public void testIsActorFailed() {
assertFalse(oper.isActorFailed(null));
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome(null);
// incorrect outcome
- outcome.setResult(PolicyResult.SUCCESS);
+ outcome.setResult(OperationResult.SUCCESS);
assertFalse(oper.isActorFailed(outcome));
- outcome.setResult(PolicyResult.FAILURE_RETRIES);
+ outcome.setResult(OperationResult.FAILURE_RETRIES);
assertFalse(oper.isActorFailed(outcome));
// correct outcome
- outcome.setResult(PolicyResult.FAILURE);
+ outcome.setResult(OperationResult.FAILURE);
// incorrect actor
- outcome.setActor(TARGET);
+ outcome.setActor(MY_SINK);
assertFalse(oper.isActorFailed(outcome));
outcome.setActor(null);
assertFalse(oper.isActorFailed(outcome));
outcome.setActor(ACTOR);
// incorrect operation
- outcome.setOperation(TARGET);
+ outcome.setOperation(MY_SINK);
assertFalse(oper.isActorFailed(outcome));
outcome.setOperation(null);
assertFalse(oper.isActorFailed(outcome));
/*
* Use an operation that doesn't override doOperation().
*/
- OperationPartial oper2 = new OperationPartial(params, operator) {};
+ OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
oper2.start();
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertNotNull(opend);
- assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
+ assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
}
@Test
@Override
protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
- OperationOutcome outcome2 = params.makeOutcome();
- outcome2.setResult(PolicyResult.SUCCESS);
+ OperationOutcome outcome2 = params.makeOutcome(null);
+ outcome2.setResult(OperationResult.SUCCESS);
/*
* Create an incomplete future that will timeout after the operation's
}
};
- assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
+ assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
}
/**
oper.setMaxFailures(10);
- verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
+ verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
}
/**
oper.setMaxFailures(10);
- verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
+ verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
}
/**
oper.setMaxFailures(10);
verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
- PolicyResult.FAILURE_RETRIES);
+ OperationResult.FAILURE_RETRIES);
}
/**
oper.setMaxFailures(maxFailures);
verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
- PolicyResult.SUCCESS);
+ OperationResult.SUCCESS);
}
/**
// arrange to return null from doOperation()
oper = new MyOper() {
@Override
- protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
+ protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
// update counters
- super.doOperation(attempt, operation);
+ super.doOperation(attempt, outcome);
return null;
}
};
- verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
+ verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
}
@Test
public void testIsSameOperation() {
assertFalse(oper.isSameOperation(null));
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome(null);
// wrong actor - should be false
outcome.setActor(null);
assertFalse(oper.isSameOperation(outcome));
- outcome.setActor(TARGET);
+ outcome.setActor(MY_SINK);
assertFalse(oper.isSameOperation(outcome));
outcome.setActor(ACTOR);
// wrong operation - should be null
outcome.setOperation(null);
assertFalse(oper.isSameOperation(outcome));
- outcome.setOperation(TARGET);
+ outcome.setOperation(MY_SINK);
assertFalse(oper.isSameOperation(outcome));
outcome.setOperation(OPERATION);
assertTrue(oper.isSameOperation(outcome));
}
- /**
- * Tests handleFailure() when the outcome is a success.
- */
- @Test
- public void testHandlePreprocessorFailureTrue() {
- oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
- verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
- }
-
- /**
- * Tests handleFailure() when the outcome is <i>not</i> a success.
- */
- @Test
- public void testHandlePreprocessorFailureFalse() throws Exception {
- oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
- verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
- }
-
- /**
- * Tests handleFailure() when the outcome is {@code null}.
- */
- @Test
- public void testHandlePreprocessorFailureNull() throws Exception {
- // arrange to return null from the preprocessor
- oper.setGuard(CompletableFuture.completedFuture(null));
-
- verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
- }
-
@Test
public void testFromException() {
// arrange to generate an exception when operation runs
oper.setGenException(true);
- verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
+ verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
}
/**
*/
@Test
public void testFromExceptionNoExcept() {
- verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
+ verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
}
/**
@Test
public void testAnyOf() throws Exception {
// first task completes, others do not
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
- final OperationOutcome outcome = params.makeOutcome();
+ final OperationOutcome outcome = params.makeOutcome(null);
- tasks.add(CompletableFuture.completedFuture(outcome));
- tasks.add(new CompletableFuture<>());
- tasks.add(new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> null);
+ tasks.add(() -> new CompletableFuture<>());
CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ // repeat using array form
+ @SuppressWarnings("unchecked")
+ Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+ result = oper.anyOf(tasks.toArray(taskArray));
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
// second task completes, others do not
- tasks = new LinkedList<>();
-
- tasks.add(new CompletableFuture<>());
- tasks.add(CompletableFuture.completedFuture(outcome));
- tasks.add(new CompletableFuture<>());
+ tasks.clear();
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> new CompletableFuture<>());
result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
-
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
// third task completes, others do not
- tasks = new LinkedList<>();
-
- tasks.add(new CompletableFuture<>());
- tasks.add(new CompletableFuture<>());
- tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.clear();
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> new CompletableFuture<>());
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
result = oper.anyOf(tasks);
- assertTrue(executor.runAll());
-
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
}
@Test
@SuppressWarnings("unchecked")
public void testAnyOfEdge() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
// zero items: check both using a list and using an array
- assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
- assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
+ assertNull(oper.anyOf(tasks));
+ assertNull(oper.anyOf());
// one item: : check both using a list and using an array
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
- tasks.add(future1);
+ tasks.add(() -> future1);
assertSame(future1, oper.anyOf(tasks));
- assertSame(future1, oper.anyOf(future1));
+ assertSame(future1, oper.anyOf(() -> future1));
}
- /**
- * Tests both flavors of allOf(), because one invokes the other.
- */
@Test
- public void testAllOf() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ public void testAllOfArray() throws Exception {
+ final OperationOutcome outcome = params.makeOutcome(null);
+
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome> result =
+ oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
+
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future1.complete(outcome);
+
+ // complete 3 before 2
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future3.complete(outcome);
+
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertFalse(result.isDone());
+ future2.complete(outcome);
- final OperationOutcome outcome = params.makeOutcome();
+ // all of them are now done
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+ }
+
+ @Test
+ public void testAllOfList() throws Exception {
+ final OperationOutcome outcome = params.makeOutcome(null);
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
- tasks.add(future1);
- tasks.add(future2);
- tasks.add(future3);
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> future1);
+ tasks.add(() -> future2);
+ tasks.add(() -> null);
+ tasks.add(() -> future3);
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future1.complete(outcome);
// complete 3 before 2
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future3.complete(outcome);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertFalse(result.isDone());
future2.complete(outcome);
// all of them are now done
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(outcome, result.get());
}
@Test
@SuppressWarnings("unchecked")
public void testAllOfEdge() throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
// zero items: check both using a list and using an array
- assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
- assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
+ assertNull(oper.allOf(tasks));
+ assertNull(oper.allOf());
// one item: : check both using a list and using an array
CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
- tasks.add(future1);
+ tasks.add(() -> future1);
assertSame(future1, oper.allOf(tasks));
- assertSame(future1, oper.allOf(future1));
+ assertSame(future1, oper.allOf(() -> future1));
+ }
+
+ @Test
+ public void testAttachFutures() throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+ // third task throws an exception during construction
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
+ tasks.add(() -> future1);
+ tasks.add(() -> future2);
+ tasks.add(() -> {
+ throw new IllegalStateException(EXPECTED_EXCEPTION);
+ });
+ tasks.add(() -> future3);
+
+ assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
+
+ // should have canceled the first two, but not the last
+ assertTrue(future1.isCancelled());
+ assertTrue(future2.isCancelled());
+ assertFalse(future3.isCancelled());
}
@Test
public void testCombineOutcomes() throws Exception {
// only one outcome
- verifyOutcomes(0, PolicyResult.SUCCESS);
- verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
+ verifyOutcomes(0, OperationResult.SUCCESS);
+ verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
// maximum is in different positions
- verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
- verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
- verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
-
- // null outcome
- final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
- tasks.add(CompletableFuture.completedFuture(null));
+ verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
+ verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
+ verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
+
+ // null outcome - takes precedence over a success
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
+ tasks.add(() -> CompletableFuture.completedFuture(null));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertNull(result.get());
IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
tasks.clear();
- tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
- tasks.add(CompletableFuture.failedFuture(except));
- tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
+ tasks.add(() -> CompletableFuture.failedFuture(except));
+ tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isCompletedExceptionally());
result.whenComplete((unused, thrown) -> assertSame(except, thrown));
}
- private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
- List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
+ /**
+ * Tests both flavors of sequence(), because one invokes the other.
+ */
+ @Test
+ public void testSequence() throws Exception {
+ final OperationOutcome outcome = params.makeOutcome(null);
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> null);
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+ CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // repeat using array form
+ @SuppressWarnings("unchecked")
+ Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
+ result = oper.sequence(tasks.toArray(taskArray));
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(outcome, result.get());
+
+ // second task fails, third should not run
+ OperationOutcome failure = params.makeOutcome(null);
+ failure.setResult(OperationResult.FAILURE);
+ tasks.clear();
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(failure));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
+
+ result = oper.sequence(tasks);
+ assertTrue(executor.runAll(MAX_REQUESTS));
+ assertTrue(result.isDone());
+ assertSame(failure, result.get());
+ }
+
+ /**
+ * Tests both flavors of sequence(), for edge cases: zero items, and one item.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSequenceEdge() throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
+
+ // zero items: check both using a list and using an array
+ assertNull(oper.sequence(tasks));
+ assertNull(oper.sequence());
+
+ // one item: : check both using a list and using an array
+ CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
+ tasks.add(() -> future1);
+
+ assertSame(future1, oper.sequence(tasks));
+ assertSame(future1, oper.sequence(() -> future1));
+ }
+
+ private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
+ List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
OperationOutcome expectedOutcome = null;
for (int count = 0; count < results.length; ++count) {
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome(null);
outcome.setResult(results[count]);
- tasks.add(CompletableFuture.completedFuture(outcome));
+ tasks.add(() -> CompletableFuture.completedFuture(outcome));
if (count == expected) {
expectedOutcome = outcome;
CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
assertTrue(result.isDone());
assertSame(expectedOutcome, result.get());
}
- private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
- final OperationOutcome taskOutcome) {
-
- return outcome -> CompletableFuture.completedFuture(taskOutcome);
- }
-
@Test
public void testDetmPriority() throws CoderException {
assertEquals(1, oper.detmPriority(null));
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome(null);
- Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
- PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
- PolicyResult.FAILURE_EXCEPTION, 6);
+ Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
+ OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
+ OperationResult.FAILURE_EXCEPTION, 6);
- for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
+ for (Entry<OperationResult, Integer> ent : map.entrySet()) {
outcome.setResult(ent.getKey());
assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
}
assertEquals(1, oper.detmPriority(outcome));
}
- /**
- * Tests doTask(Future) when the controller is not running.
- */
- @Test
- public void testDoTaskFutureNotRunning() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- controller.complete(params.makeOutcome());
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should have canceled the task future
- assertTrue(taskFuture.isCancelled());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was successful.
- */
- @Test
- public void testDoTaskFutureSuccess() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
-
- taskFuture.complete(taskOutcome);
- assertTrue(executor.runAll());
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was failed.
- */
- @Test
- public void testDoTaskFutureFailure() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should have canceled the task future
- assertTrue(taskFuture.isCancelled());
-
- // controller SHOULD be done now
- assertTrue(controller.isDone());
- assertSame(failedOutcome, controller.get());
- }
-
- /**
- * Tests doTask(Future) when the previous outcome was failed, but not checking
- * success.
- */
- @Test
- public void testDoTaskFutureUncheckedFailure() throws Exception {
- CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
- assertFalse(future.isDone());
-
- // complete the task
- OperationOutcome taskOutcome = params.makeOutcome();
- taskFuture.complete(taskOutcome);
-
- assertTrue(executor.runAll());
-
- // should have run the task
- assertTrue(future.isDone());
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Function) when the controller is not running.
- */
- @Test
- public void testDoTaskFunctionNotRunning() throws Exception {
- AtomicBoolean invoked = new AtomicBoolean();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
- invoked.set(true);
- return CompletableFuture.completedFuture(params.makeOutcome());
- };
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- controller.complete(params.makeOutcome());
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should not have even invoked the task
- assertFalse(invoked.get());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was successful.
- */
- @Test
- public void testDoTaskFunctionSuccess() throws Exception {
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- final OperationOutcome failedOutcome = params.makeOutcome();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was failed.
- */
- @Test
- public void testDoTaskFunctionFailure() throws Exception {
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- AtomicBoolean invoked = new AtomicBoolean();
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
- invoked.set(true);
- return CompletableFuture.completedFuture(params.makeOutcome());
- };
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
- assertFalse(future.isDone());
- assertTrue(executor.runAll());
-
- // should not have run the task
- assertFalse(future.isDone());
-
- // should not have even invoked the task
- assertFalse(invoked.get());
-
- // controller should have the failed task
- assertTrue(controller.isDone());
- assertSame(failedOutcome, controller.get());
- }
-
- /**
- * Tests doTask(Function) when the previous outcome was failed, but not checking
- * success.
- */
- @Test
- public void testDoTaskFunctionUncheckedFailure() throws Exception {
- final OperationOutcome taskOutcome = params.makeOutcome();
-
- final OperationOutcome failedOutcome = params.makeOutcome();
- failedOutcome.setResult(PolicyResult.FAILURE);
-
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
-
- PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
-
- assertTrue(future.isDone());
- assertSame(taskOutcome, future.get());
-
- // controller should not be done yet
- assertFalse(controller.isDone());
- }
-
/**
* Tests callbackStarted() when the pipeline has already been stopped.
*/
oper = new MyOper();
future.set(oper.start());
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
// should have only run once
assertEquals(1, numStart);
oper = new MyOper();
future.set(oper.start());
- assertTrue(executor.runAll());
+ assertTrue(executor.runAll(MAX_REQUESTS));
// should not have been set
assertNull(opend);
outcome = new OperationOutcome();
oper.setOutcome(outcome, timex);
assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
- assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
+ assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
outcome = new OperationOutcome();
oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
- assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
+ assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
}
@Test
OperationOutcome outcome;
outcome = new OperationOutcome();
- oper.setOutcome(outcome, PolicyResult.SUCCESS);
+ oper.setOutcome(outcome, OperationResult.SUCCESS);
assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
- assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
- for (PolicyResult result : FAILURE_RESULTS) {
+ oper.setOutcome(outcome, OperationResult.SUCCESS);
+ assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
+ assertEquals(OperationResult.SUCCESS, outcome.getResult());
+
+ for (OperationResult result : FAILURE_RESULTS) {
outcome = new OperationOutcome();
oper.setOutcome(outcome, result);
assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
assertTrue(oper.isTimeout(new CompletionException(timex)));
}
+ @Test
+ public void testLogMessage() {
+ final String infraStr = SINK_INFRA.toString();
+
+ // log structured data
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+ List<String> output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
+ .contains("{\n \"text\": \"my-text\"\n}");
+
+ // repeat with a response
+ appender.clearExtractions();
+ oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
+ .contains("{\n \"text\": \"my-text\"\n}");
+
+ // log a plain string
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
+
+ // log a null request
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
+ output = appender.getExtracted();
+ assertEquals(1, output.size());
+
+ assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
+
+ // generate exception from coder
+ setOperCoderException();
+
+ appender.clearExtractions();
+ oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print request");
+ assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
+
+ // repeat with a response
+ appender.clearExtractions();
+ oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
+ output = appender.getExtracted();
+ assertEquals(2, output.size());
+ assertThat(output.get(0)).contains("cannot pretty-print response");
+ assertThat(output.get(1)).contains(MY_SOURCE);
+ }
+
@Test
public void testGetRetry() {
assertEquals(0, oper.getRetry(null));
@Test
public void testGetRetryWait() {
// need an operator that doesn't override the retry time
- OperationPartial oper2 = new OperationPartial(params, operator) {};
+ OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
}
+ @Test
+ public void testGetTargetEntity() {
+ // get it from the params
+ assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
+
+ // now get it from the properties
+ oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
+ assertEquals("entityX", oper.getTargetEntity());
+ }
+
@Test
public void testGetTimeOutMs() {
assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
++numStart;
tstart = oper.getStart();
opstart = oper;
+ starts.add(oper);
}
private void completer(OperationOutcome oper) {
++numEnd;
opend = oper;
+ ends.add(oper);
}
/**
}
private OperationOutcome makeSuccess() {
- OperationOutcome outcome = params.makeOutcome();
- outcome.setResult(PolicyResult.SUCCESS);
-
- return outcome;
- }
-
- private OperationOutcome makeFailure() {
- OperationOutcome outcome = params.makeOutcome();
- outcome.setResult(PolicyResult.FAILURE);
+ OperationOutcome outcome = params.makeOutcome(null);
+ outcome.setResult(OperationResult.SUCCESS);
return outcome;
}
* @param expectedResult expected outcome
*/
private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
- PolicyResult expectedResult) {
+ OperationResult expectedResult) {
- String expectedSubRequestId =
- (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
-
- verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
+ verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
}
/**
* @param expectedCallbacks number of callbacks expected
* @param expectedOperations number of operation invocations expected
* @param expectedResult expected outcome
- * @param expectedSubRequestId expected sub request ID
* @param manipulator function to modify the future returned by
* {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
* in the executor are run
*/
- private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
- String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
+ private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
+ OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
+
+ tstart = null;
+ opstart = null;
+ opend = null;
+ starts.clear();
+ ends.clear();
CompletableFuture<OperationOutcome> future = oper.start();
manipulator.accept(future);
- assertTrue(testName, executor.runAll());
+ assertTrue(testName, executor.runAll(MAX_REQUESTS));
assertEquals(testName, expectedCallbacks, numStart);
assertEquals(testName, expectedCallbacks, numEnd);
try {
assertTrue(future.isDone());
- assertSame(testName, opend, future.get());
+ assertEquals(testName, opend, future.get());
+
+ // "start" is never final
+ for (OperationOutcome outcome : starts) {
+ assertFalse(testName, outcome.isFinalOutcome());
+ }
+
+ // only the last "complete" is final
+ assertTrue(testName, ends.removeLast().isFinalOutcome());
+
+ for (OperationOutcome outcome : ends) {
+ assertFalse(outcome.isFinalOutcome());
+ }
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
if (expectedOperations > 0) {
- assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
+ assertNotNull(testName, oper.getSubRequestId());
+ assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
+ assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
}
}
assertEquals(testName, expectedOperations, oper.getCount());
}
+ /**
+ * Creates a new {@link #oper} whose coder will throw an exception.
+ */
+ private void setOperCoderException() {
+ oper = new MyOper() {
+ @Override
+ protected Coder getCoder() {
+ return new StandardCoder() {
+ @Override
+ public String encode(Object object, boolean pretty) throws CoderException {
+ throw new CoderException(EXPECTED_EXCEPTION);
+ }
+ };
+ }
+ };
+ }
+
+
+ @Getter
+ public static class MyData {
+ private String text = TEXT;
+ }
+
+
private class MyOper extends OperationPartial {
@Getter
private int count = 0;
@Setter
private boolean genException;
-
@Setter
private int maxFailures = 0;
-
@Setter
- private CompletableFuture<OperationOutcome> guard;
+ private CompletableFuture<OperationOutcome> preProc;
public MyOper() {
- super(OperationPartialTest.this.params, operator);
+ super(OperationPartialTest.this.params, config, PROP_NAMES);
}
@Override
operation.setSubRequestId(String.valueOf(attempt));
if (count > maxFailures) {
- operation.setResult(PolicyResult.SUCCESS);
+ operation.setResult(OperationResult.SUCCESS);
} else {
- operation.setResult(PolicyResult.FAILURE);
+ operation.setResult(OperationResult.FAILURE);
}
return operation;
}
- @Override
- protected CompletableFuture<OperationOutcome> startGuardAsync() {
- return (guard != null ? guard : super.startGuardAsync());
- }
-
@Override
protected long getRetryWaitMs() {
/*
return 0L;
}
}
-
- /**
- * Executor that will run tasks until the queue is empty or a maximum number of tasks
- * have been executed. Doesn't actually run anything until {@link #runAll()} is
- * invoked.
- */
- private static class MyExec implements Executor {
- private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
-
- private Queue<Runnable> commands = new LinkedList<>();
-
- public MyExec() {
- // do nothing
- }
-
- public int getQueueLength() {
- return commands.size();
- }
-
- @Override
- public void execute(Runnable command) {
- commands.add(command);
- }
-
- public boolean runAll() {
- for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
- commands.remove().run();
- }
-
- return commands.isEmpty();
- }
- }
}