2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertFalse;
 
  27 import static org.junit.Assert.assertNotNull;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertSame;
 
  30 import static org.junit.Assert.assertTrue;
 
  31 import static org.mockito.ArgumentMatchers.any;
 
  32 import static org.mockito.Mockito.verify;
 
  33 import static org.mockito.Mockito.when;
 
  35 import ch.qos.logback.classic.Logger;
 
  36 import java.time.Instant;
 
  37 import java.util.ArrayDeque;
 
  38 import java.util.Arrays;
 
  39 import java.util.Collections;
 
  40 import java.util.Deque;
 
  41 import java.util.LinkedList;
 
  42 import java.util.List;
 
  44 import java.util.Map.Entry;
 
  45 import java.util.UUID;
 
  46 import java.util.concurrent.CompletableFuture;
 
  47 import java.util.concurrent.CompletionException;
 
  48 import java.util.concurrent.ExecutionException;
 
  49 import java.util.concurrent.ForkJoinPool;
 
  50 import java.util.concurrent.Future;
 
  51 import java.util.concurrent.TimeUnit;
 
  52 import java.util.concurrent.TimeoutException;
 
  53 import java.util.concurrent.atomic.AtomicReference;
 
  54 import java.util.function.Consumer;
 
  55 import java.util.function.Supplier;
 
  56 import java.util.stream.Collectors;
 
  59 import org.junit.AfterClass;
 
  60 import org.junit.Before;
 
  61 import org.junit.BeforeClass;
 
  62 import org.junit.Test;
 
  63 import org.mockito.ArgumentCaptor;
 
  64 import org.mockito.Mock;
 
  65 import org.mockito.MockitoAnnotations;
 
  66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  68 import org.onap.policy.common.utils.coder.Coder;
 
  69 import org.onap.policy.common.utils.coder.CoderException;
 
  70 import org.onap.policy.common.utils.coder.StandardCoder;
 
  71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 
  72 import org.onap.policy.common.utils.time.PseudoExecutor;
 
  73 import org.onap.policy.controlloop.ControlLoopOperation;
 
  74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 
  75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 
  76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  78 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
 
  79 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 
  80 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 
  81 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  82 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
 
  83 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 
  84 import org.onap.policy.controlloop.policy.PolicyResult;
 
  85 import org.slf4j.LoggerFactory;
 
  87 public class OperationPartialTest {
 
  88     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
 
  89     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
 
  90     private static final int MAX_REQUESTS = 100;
 
  91     private static final int MAX_PARALLEL = 10;
 
  92     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  93     private static final String ACTOR = "my-actor";
 
  94     private static final String OPERATION = "my-operation";
 
  95     private static final String MY_SINK = "my-sink";
 
  96     private static final String MY_SOURCE = "my-source";
 
  97     private static final String MY_TARGET_ENTITY = "my-entity";
 
  98     private static final String TEXT = "my-text";
 
  99     private static final int TIMEOUT = 1000;
 
 100     private static final UUID REQ_ID = UUID.randomUUID();
 
 102     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
 
 103                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
 106      * Used to attach an appender to the class' logger.
 
 108     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
 
 109     private static final ExtractAppender appender = new ExtractAppender();
 
 111     private static final List<String> PROP_NAMES = List.of("hello", "world");
 
 114     private ActorService service;
 
 116     private Actor guardActor;
 
 118     private Operator guardOperator;
 
 120     private Operation guardOperation;
 
 122     private VirtualControlLoopEvent event;
 
 123     private ControlLoopEventContext context;
 
 124     private PseudoExecutor executor;
 
 125     private ControlLoopOperationParams params;
 
 129     private int numStart;
 
 132     private Instant tstart;
 
 134     private OperationOutcome opstart;
 
 135     private OperationOutcome opend;
 
 137     private Deque<OperationOutcome> starts;
 
 138     private Deque<OperationOutcome> ends;
 
 140     private OperatorConfig config;
 
 143      * Attaches the appender to the logger.
 
 146     public static void setUpBeforeClass() throws Exception {
 
 148          * Attach appender to the logger.
 
 150         appender.setContext(logger.getLoggerContext());
 
 153         logger.addAppender(appender);
 
 157      * Stops the appender.
 
 160     public static void tearDownAfterClass() {
 
 165      * Initializes the fields, including {@link #oper}.
 
 168     public void setUp() {
 
 169         MockitoAnnotations.initMocks(this);
 
 171         event = new VirtualControlLoopEvent();
 
 172         event.setRequestId(REQ_ID);
 
 174         context = new ControlLoopEventContext(event);
 
 175         executor = new PseudoExecutor();
 
 177         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
 
 178                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
 
 179                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
 
 181         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
 
 182         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
 
 183         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
 
 184         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
 
 186         config = new OperatorConfig(executor);
 
 195         starts = new ArrayDeque<>(10);
 
 196         ends = new ArrayDeque<>(10);
 
 200     public void testOperatorPartial_testGetActorName_testGetName() {
 
 201         assertEquals(ACTOR, oper.getActorName());
 
 202         assertEquals(OPERATION, oper.getName());
 
 203         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
 
 207     public void testGetBlockingThread() throws Exception {
 
 208         CompletableFuture<Void> future = new CompletableFuture<>();
 
 210         // use the real executor
 
 211         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
 
 213             public Operation buildOperation(ControlLoopOperationParams params) {
 
 218         oper2.getBlockingExecutor().execute(() -> future.complete(null));
 
 220         assertNull(future.get(5, TimeUnit.SECONDS));
 
 224     public void testGetPropertyNames() {
 
 225         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
 
 229     public void testGetProperty_testSetProperty() {
 
 230         oper.setProperty("propertyA", "valueA");
 
 231         oper.setProperty("propertyB", "valueB");
 
 232         oper.setProperty("propertyC", 20);
 
 234         assertEquals("valueA", oper.getProperty("propertyA"));
 
 235         assertEquals("valueB", oper.getProperty("propertyB"));
 
 236         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
 
 240     public void testStart() {
 
 241         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
 
 245      * Tests start() with multiple running requests.
 
 248     public void testStartMultiple() {
 
 249         for (int count = 0; count < MAX_PARALLEL; ++count) {
 
 253         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
 255         assertNotNull(opstart);
 
 256         assertNotNull(opend);
 
 257         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
 259         assertEquals(MAX_PARALLEL, numStart);
 
 260         assertEquals(MAX_PARALLEL, oper.getCount());
 
 261         assertEquals(MAX_PARALLEL, numEnd);
 
 265      * Tests startPreprocessor() when the preprocessor returns a failure.
 
 268     public void testStartPreprocessorFailure() {
 
 269         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 271         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
 
 275      * Tests startPreprocessor() when the preprocessor throws an exception.
 
 278     public void testStartPreprocessorException() {
 
 279         // arrange for the preprocessor to throw an exception
 
 280         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
 
 282         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
 
 286      * Tests startPreprocessor() when the pipeline is not running.
 
 289     public void testStartPreprocessorNotRunning() {
 
 290         // arrange for the preprocessor to return success, which will be ignored
 
 291         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
 293         oper.start().cancel(false);
 
 294         assertTrue(executor.runAll(MAX_REQUESTS));
 
 299         assertEquals(0, numStart);
 
 300         assertEquals(0, oper.getCount());
 
 301         assertEquals(0, numEnd);
 
 305      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
 
 308     public void testStartPreprocessorBuilderException() {
 
 309         oper = new MyOper() {
 
 311             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
 312                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 316         assertThatIllegalStateException().isThrownBy(() -> oper.start());
 
 318         // should be nothing in the queue
 
 319         assertEquals(0, executor.getQueueLength());
 
 323     public void testStartPreprocessorAsync() {
 
 324         assertNull(oper.startPreprocessorAsync());
 
 328     public void testStartGuardAsync() throws Exception {
 
 329         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
 
 330         assertTrue(future.isDone());
 
 331         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
 
 333         // verify the parameters that were passed
 
 334         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
 
 335                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
 
 336         verify(guardOperator).buildOperation(paramsCaptor.capture());
 
 338         params = paramsCaptor.getValue();
 
 339         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
 
 340         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
 
 341         assertNull(params.getRetry());
 
 342         assertNull(params.getTimeoutSec());
 
 344         Map<String, Object> payload = params.getPayload();
 
 345         assertNotNull(payload);
 
 347         assertEquals(oper.makeGuardPayload(), payload);
 
 351      * Tests startGuardAsync() when preprocessing is disabled.
 
 354     public void testStartGuardAsyncDisabled() {
 
 355         params = params.toBuilder().preprocessed(true).build();
 
 356         assertNull(new MyOper().startGuardAsync());
 
 360     public void testMakeGuardPayload() {
 
 361         Map<String, Object> payload = oper.makeGuardPayload();
 
 362         assertSame(REQ_ID, payload.get("requestId"));
 
 364         // request id changes, so remove it
 
 365         payload.remove("requestId");
 
 367         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
 
 369         // repeat, but with closed loop name
 
 370         event.setClosedLoopControlName("my-loop");
 
 371         payload = oper.makeGuardPayload();
 
 372         payload.remove("requestId");
 
 373         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
 
 377     public void testStartOperationAsync() {
 
 379         assertTrue(executor.runAll(MAX_REQUESTS));
 
 381         assertEquals(1, oper.getCount());
 
 385     public void testIsSuccess() {
 
 386         assertFalse(oper.isSuccess(null));
 
 388         OperationOutcome outcome = new OperationOutcome();
 
 390         outcome.setResult(PolicyResult.SUCCESS);
 
 391         assertTrue(oper.isSuccess(outcome));
 
 393         for (PolicyResult failure : FAILURE_RESULTS) {
 
 394             outcome.setResult(failure);
 
 395             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
 
 400     public void testIsActorFailed() {
 
 401         assertFalse(oper.isActorFailed(null));
 
 403         OperationOutcome outcome = params.makeOutcome(null);
 
 406         outcome.setResult(PolicyResult.SUCCESS);
 
 407         assertFalse(oper.isActorFailed(outcome));
 
 409         outcome.setResult(PolicyResult.FAILURE_RETRIES);
 
 410         assertFalse(oper.isActorFailed(outcome));
 
 413         outcome.setResult(PolicyResult.FAILURE);
 
 416         outcome.setActor(MY_SINK);
 
 417         assertFalse(oper.isActorFailed(outcome));
 
 418         outcome.setActor(null);
 
 419         assertFalse(oper.isActorFailed(outcome));
 
 420         outcome.setActor(ACTOR);
 
 422         // incorrect operation
 
 423         outcome.setOperation(MY_SINK);
 
 424         assertFalse(oper.isActorFailed(outcome));
 
 425         outcome.setOperation(null);
 
 426         assertFalse(oper.isActorFailed(outcome));
 
 427         outcome.setOperation(OPERATION);
 
 430         assertTrue(oper.isActorFailed(outcome));
 
 434     public void testDoOperation() {
 
 436          * Use an operation that doesn't override doOperation().
 
 438         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
 441         assertTrue(executor.runAll(MAX_REQUESTS));
 
 443         assertNotNull(opend);
 
 444         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
 
 448     public void testTimeout() throws Exception {
 
 450         // use a real executor
 
 451         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 453         // trigger timeout very quickly
 
 454         oper = new MyOper() {
 
 456             protected long getTimeoutMs(Integer timeoutSec) {
 
 461             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 463                 OperationOutcome outcome2 = params.makeOutcome(null);
 
 464                 outcome2.setResult(PolicyResult.SUCCESS);
 
 467                  * Create an incomplete future that will timeout after the operation's
 
 468                  * timeout. If it fires before the other timer, then it will return a
 
 471                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 472                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
 
 473                                 params.getExecutor());
 
 479         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
 
 483      * Tests retry functions, when the count is set to zero and retries are exhausted.
 
 486     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
 
 487         params = params.toBuilder().retry(0).build();
 
 489         // new params, thus need a new operation
 
 492         oper.setMaxFailures(10);
 
 494         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
 
 498      * Tests retry functions, when the count is null and retries are exhausted.
 
 501     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
 
 502         params = params.toBuilder().retry(null).build();
 
 504         // new params, thus need a new operation
 
 507         oper.setMaxFailures(10);
 
 509         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
 
 513      * Tests retry functions, when retries are exhausted.
 
 516     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
 
 517         final int maxRetries = 3;
 
 518         params = params.toBuilder().retry(maxRetries).build();
 
 520         // new params, thus need a new operation
 
 523         oper.setMaxFailures(10);
 
 525         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
 
 526                         PolicyResult.FAILURE_RETRIES);
 
 530      * Tests retry functions, when a success follows some retries.
 
 533     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
 
 534         params = params.toBuilder().retry(10).build();
 
 536         // new params, thus need a new operation
 
 539         final int maxFailures = 3;
 
 540         oper.setMaxFailures(maxFailures);
 
 542         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
 
 543                         PolicyResult.SUCCESS);
 
 547      * Tests retry functions, when the outcome is {@code null}.
 
 550     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
 
 552         // arrange to return null from doOperation()
 
 553         oper = new MyOper() {
 
 555             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
 
 558                 super.doOperation(attempt, outcome);
 
 563         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
 
 567     public void testSleep() throws Exception {
 
 568         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
 
 569         assertTrue(future.isDone());
 
 570         assertNull(future.get());
 
 573         future = oper.sleep(0, TimeUnit.SECONDS);
 
 574         assertTrue(future.isDone());
 
 575         assertNull(future.get());
 
 578          * Start a second sleep we can use to check the first while it's running.
 
 580         tstart = Instant.now();
 
 581         future = oper.sleep(100, TimeUnit.MILLISECONDS);
 
 583         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
 
 585         // wait for second to complete and verify that the first has not completed
 
 587         assertFalse(future.isDone());
 
 589         // wait for second to complete
 
 592         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
 
 593         assertTrue(diff >= 99);
 
 597     public void testIsSameOperation() {
 
 598         assertFalse(oper.isSameOperation(null));
 
 600         OperationOutcome outcome = params.makeOutcome(null);
 
 602         // wrong actor - should be false
 
 603         outcome.setActor(null);
 
 604         assertFalse(oper.isSameOperation(outcome));
 
 605         outcome.setActor(MY_SINK);
 
 606         assertFalse(oper.isSameOperation(outcome));
 
 607         outcome.setActor(ACTOR);
 
 609         // wrong operation - should be null
 
 610         outcome.setOperation(null);
 
 611         assertFalse(oper.isSameOperation(outcome));
 
 612         outcome.setOperation(MY_SINK);
 
 613         assertFalse(oper.isSameOperation(outcome));
 
 614         outcome.setOperation(OPERATION);
 
 616         assertTrue(oper.isSameOperation(outcome));
 
 620      * Tests handleFailure() when the outcome is a success.
 
 623     public void testHandlePreprocessorFailureSuccess() {
 
 624         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
 
 625         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
 
 629      * Tests handleFailure() when the outcome is <i>not</i> a success.
 
 632     public void testHandlePreprocessorFailureFailed() throws Exception {
 
 633         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 634         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
 
 638      * Tests handleFailure() when the outcome is {@code null}.
 
 641     public void testHandlePreprocessorFailureNull() throws Exception {
 
 642         // arrange to return a null outcome from the preprocessor
 
 643         oper.setPreProc(CompletableFuture.completedFuture(null));
 
 644         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
 
 648     public void testFromException() {
 
 649         // arrange to generate an exception when operation runs
 
 650         oper.setGenException(true);
 
 652         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
 
 656      * Tests fromException() when there is no exception.
 
 659     public void testFromExceptionNoExcept() {
 
 660         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
 
 664      * Tests both flavors of anyOf(), because one invokes the other.
 
 667     public void testAnyOf() throws Exception {
 
 668         // first task completes, others do not
 
 669         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 671         final OperationOutcome outcome = params.makeOutcome(null);
 
 673         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 674         tasks.add(() -> new CompletableFuture<>());
 
 675         tasks.add(() -> null);
 
 676         tasks.add(() -> new CompletableFuture<>());
 
 678         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
 
 679         assertTrue(executor.runAll(MAX_REQUESTS));
 
 680         assertTrue(result.isDone());
 
 681         assertSame(outcome, result.get());
 
 683         // repeat using array form
 
 684         @SuppressWarnings("unchecked")
 
 685         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 686         result = oper.anyOf(tasks.toArray(taskArray));
 
 687         assertTrue(executor.runAll(MAX_REQUESTS));
 
 688         assertTrue(result.isDone());
 
 689         assertSame(outcome, result.get());
 
 691         // second task completes, others do not
 
 693         tasks.add(() -> new CompletableFuture<>());
 
 694         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 695         tasks.add(() -> new CompletableFuture<>());
 
 697         result = oper.anyOf(tasks);
 
 698         assertTrue(executor.runAll(MAX_REQUESTS));
 
 699         assertTrue(result.isDone());
 
 700         assertSame(outcome, result.get());
 
 702         // third task completes, others do not
 
 704         tasks.add(() -> new CompletableFuture<>());
 
 705         tasks.add(() -> new CompletableFuture<>());
 
 706         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 708         result = oper.anyOf(tasks);
 
 709         assertTrue(executor.runAll(MAX_REQUESTS));
 
 710         assertTrue(result.isDone());
 
 711         assertSame(outcome, result.get());
 
 715      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
 
 718     @SuppressWarnings("unchecked")
 
 719     public void testAnyOfEdge() throws Exception {
 
 720         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 722         // zero items: check both using a list and using an array
 
 723         assertNull(oper.anyOf(tasks));
 
 724         assertNull(oper.anyOf());
 
 726         // one item: : check both using a list and using an array
 
 727         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 728         tasks.add(() -> future1);
 
 730         assertSame(future1, oper.anyOf(tasks));
 
 731         assertSame(future1, oper.anyOf(() -> future1));
 
 735     public void testAllOfArray() throws Exception {
 
 736         final OperationOutcome outcome = params.makeOutcome(null);
 
 738         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 739         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 740         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 742         @SuppressWarnings("unchecked")
 
 743         CompletableFuture<OperationOutcome> result =
 
 744                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
 
 746         assertTrue(executor.runAll(MAX_REQUESTS));
 
 747         assertFalse(result.isDone());
 
 748         future1.complete(outcome);
 
 750         // complete 3 before 2
 
 751         assertTrue(executor.runAll(MAX_REQUESTS));
 
 752         assertFalse(result.isDone());
 
 753         future3.complete(outcome);
 
 755         assertTrue(executor.runAll(MAX_REQUESTS));
 
 756         assertFalse(result.isDone());
 
 757         future2.complete(outcome);
 
 759         // all of them are now done
 
 760         assertTrue(executor.runAll(MAX_REQUESTS));
 
 761         assertTrue(result.isDone());
 
 762         assertSame(outcome, result.get());
 
 766     public void testAllOfList() throws Exception {
 
 767         final OperationOutcome outcome = params.makeOutcome(null);
 
 769         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 770         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 771         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 773         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 774         tasks.add(() -> future1);
 
 775         tasks.add(() -> future2);
 
 776         tasks.add(() -> null);
 
 777         tasks.add(() -> future3);
 
 779         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 781         assertTrue(executor.runAll(MAX_REQUESTS));
 
 782         assertFalse(result.isDone());
 
 783         future1.complete(outcome);
 
 785         // complete 3 before 2
 
 786         assertTrue(executor.runAll(MAX_REQUESTS));
 
 787         assertFalse(result.isDone());
 
 788         future3.complete(outcome);
 
 790         assertTrue(executor.runAll(MAX_REQUESTS));
 
 791         assertFalse(result.isDone());
 
 792         future2.complete(outcome);
 
 794         // all of them are now done
 
 795         assertTrue(executor.runAll(MAX_REQUESTS));
 
 796         assertTrue(result.isDone());
 
 797         assertSame(outcome, result.get());
 
 801      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
 
 804     @SuppressWarnings("unchecked")
 
 805     public void testAllOfEdge() throws Exception {
 
 806         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 808         // zero items: check both using a list and using an array
 
 809         assertNull(oper.allOf(tasks));
 
 810         assertNull(oper.allOf());
 
 812         // one item: : check both using a list and using an array
 
 813         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 814         tasks.add(() -> future1);
 
 816         assertSame(future1, oper.allOf(tasks));
 
 817         assertSame(future1, oper.allOf(() -> future1));
 
 821     public void testAttachFutures() throws Exception {
 
 822         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 824         // third task throws an exception during construction
 
 825         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 826         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 827         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 828         tasks.add(() -> future1);
 
 829         tasks.add(() -> future2);
 
 831             throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 833         tasks.add(() -> future3);
 
 835         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
 
 837         // should have canceled the first two, but not the last
 
 838         assertTrue(future1.isCancelled());
 
 839         assertTrue(future2.isCancelled());
 
 840         assertFalse(future3.isCancelled());
 
 844     public void testCombineOutcomes() throws Exception {
 
 846         verifyOutcomes(0, PolicyResult.SUCCESS);
 
 847         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
 
 849         // maximum is in different positions
 
 850         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
 
 851         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
 
 852         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
 854         // null outcome - takes precedence over a success
 
 855         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 856         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
 
 857         tasks.add(() -> CompletableFuture.completedFuture(null));
 
 858         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
 
 859         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 861         assertTrue(executor.runAll(MAX_REQUESTS));
 
 862         assertTrue(result.isDone());
 
 863         assertNull(result.get());
 
 865         // one throws an exception during execution
 
 866         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
 869         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
 
 870         tasks.add(() -> CompletableFuture.failedFuture(except));
 
 871         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
 
 872         result = oper.allOf(tasks);
 
 874         assertTrue(executor.runAll(MAX_REQUESTS));
 
 875         assertTrue(result.isCompletedExceptionally());
 
 876         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
 
 880      * Tests both flavors of sequence(), because one invokes the other.
 
 883     public void testSequence() throws Exception {
 
 884         final OperationOutcome outcome = params.makeOutcome(null);
 
 886         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 887         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 888         tasks.add(() -> null);
 
 889         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 890         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 892         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
 
 893         assertTrue(executor.runAll(MAX_REQUESTS));
 
 894         assertTrue(result.isDone());
 
 895         assertSame(outcome, result.get());
 
 897         // repeat using array form
 
 898         @SuppressWarnings("unchecked")
 
 899         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 900         result = oper.sequence(tasks.toArray(taskArray));
 
 901         assertTrue(executor.runAll(MAX_REQUESTS));
 
 902         assertTrue(result.isDone());
 
 903         assertSame(outcome, result.get());
 
 905         // second task fails, third should not run
 
 906         OperationOutcome failure = params.makeOutcome(null);
 
 907         failure.setResult(PolicyResult.FAILURE);
 
 909         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 910         tasks.add(() -> CompletableFuture.completedFuture(failure));
 
 911         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 913         result = oper.sequence(tasks);
 
 914         assertTrue(executor.runAll(MAX_REQUESTS));
 
 915         assertTrue(result.isDone());
 
 916         assertSame(failure, result.get());
 
 920      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
 
 923     @SuppressWarnings("unchecked")
 
 924     public void testSequenceEdge() throws Exception {
 
 925         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 927         // zero items: check both using a list and using an array
 
 928         assertNull(oper.sequence(tasks));
 
 929         assertNull(oper.sequence());
 
 931         // one item: : check both using a list and using an array
 
 932         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 933         tasks.add(() -> future1);
 
 935         assertSame(future1, oper.sequence(tasks));
 
 936         assertSame(future1, oper.sequence(() -> future1));
 
 939     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
 
 940         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 942         OperationOutcome expectedOutcome = null;
 
 944         for (int count = 0; count < results.length; ++count) {
 
 945             OperationOutcome outcome = params.makeOutcome(null);
 
 946             outcome.setResult(results[count]);
 
 947             tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 949             if (count == expected) {
 
 950                 expectedOutcome = outcome;
 
 954         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 956         assertTrue(executor.runAll(MAX_REQUESTS));
 
 957         assertTrue(result.isDone());
 
 958         assertSame(expectedOutcome, result.get());
 
 962     public void testDetmPriority() throws CoderException {
 
 963         assertEquals(1, oper.detmPriority(null));
 
 965         OperationOutcome outcome = params.makeOutcome(null);
 
 967         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
 
 968                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
 
 969                         PolicyResult.FAILURE_EXCEPTION, 6);
 
 971         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
 
 972             outcome.setResult(ent.getKey());
 
 973             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
 
 977          * Test null result. We can't actually set it to null, because the set() method
 
 978          * won't allow it. Instead, we decode it from a structure.
 
 980         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
 
 981         assertEquals(1, oper.detmPriority(outcome));
 
 985      * Tests callbackStarted() when the pipeline has already been stopped.
 
 988     public void testCallbackStartedNotRunning() {
 
 989         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
 992          * arrange to stop the controller when the start-callback is invoked, but capture
 
 995         params = params.toBuilder().startCallback(oper -> {
 
 997             future.get().cancel(false);
 
1000         // new params, thus need a new operation
 
1001         oper = new MyOper();
 
1003         future.set(oper.start());
 
1004         assertTrue(executor.runAll(MAX_REQUESTS));
 
1006         // should have only run once
 
1007         assertEquals(1, numStart);
 
1011      * Tests callbackCompleted() when the pipeline has already been stopped.
 
1014     public void testCallbackCompletedNotRunning() {
 
1015         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
1017         // arrange to stop the controller when the start-callback is invoked
 
1018         params = params.toBuilder().startCallback(oper -> {
 
1019             future.get().cancel(false);
 
1022         // new params, thus need a new operation
 
1023         oper = new MyOper();
 
1025         future.set(oper.start());
 
1026         assertTrue(executor.runAll(MAX_REQUESTS));
 
1028         // should not have been set
 
1030         assertEquals(0, numEnd);
 
1034     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
 
1035         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
 
1037         OperationOutcome outcome;
 
1039         outcome = new OperationOutcome();
 
1040         oper.setOutcome(outcome, timex);
 
1041         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1042         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
 
1044         outcome = new OperationOutcome();
 
1045         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
 
1046         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1047         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
 
1051     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
 
1052         OperationOutcome outcome;
 
1054         outcome = new OperationOutcome();
 
1055         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1056         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1057         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1059         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1060         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1061         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1063         for (PolicyResult result : FAILURE_RESULTS) {
 
1064             outcome = new OperationOutcome();
 
1065             oper.setOutcome(outcome, result);
 
1066             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1067             assertEquals(result.toString(), result, outcome.getResult());
 
1072     public void testIsTimeout() {
 
1073         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
 
1075         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
 
1076         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
 
1077         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
 
1078         assertFalse(oper.isTimeout(new CompletionException(null)));
 
1079         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
 
1081         assertTrue(oper.isTimeout(timex));
 
1082         assertTrue(oper.isTimeout(new CompletionException(timex)));
 
1086     public void testLogMessage() {
 
1087         final String infraStr = SINK_INFRA.toString();
 
1089         // log structured data
 
1090         appender.clearExtractions();
 
1091         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1092         List<String> output = appender.getExtracted();
 
1093         assertEquals(1, output.size());
 
1095         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
 
1096                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1098         // repeat with a response
 
1099         appender.clearExtractions();
 
1100         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1101         output = appender.getExtracted();
 
1102         assertEquals(1, output.size());
 
1104         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
 
1105                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1107         // log a plain string
 
1108         appender.clearExtractions();
 
1109         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
 
1110         output = appender.getExtracted();
 
1111         assertEquals(1, output.size());
 
1112         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
 
1114         // log a null request
 
1115         appender.clearExtractions();
 
1116         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
 
1117         output = appender.getExtracted();
 
1118         assertEquals(1, output.size());
 
1120         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
 
1122         // generate exception from coder
 
1123         setOperCoderException();
 
1125         appender.clearExtractions();
 
1126         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1127         output = appender.getExtracted();
 
1128         assertEquals(2, output.size());
 
1129         assertThat(output.get(0)).contains("cannot pretty-print request");
 
1130         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
 
1132         // repeat with a response
 
1133         appender.clearExtractions();
 
1134         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1135         output = appender.getExtracted();
 
1136         assertEquals(2, output.size());
 
1137         assertThat(output.get(0)).contains("cannot pretty-print response");
 
1138         assertThat(output.get(1)).contains(MY_SOURCE);
 
1142     public void testGetRetry() {
 
1143         assertEquals(0, oper.getRetry(null));
 
1144         assertEquals(10, oper.getRetry(10));
 
1148     public void testGetRetryWait() {
 
1149         // need an operator that doesn't override the retry time
 
1150         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
1151         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
 
1155     public void testGetTargetEntity() {
 
1156         // get it from the params
 
1157         assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
 
1159         // now get it from the properties
 
1160         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
 
1161         assertEquals("entityX", oper.getTargetEntity());
 
1165     public void testGetTimeOutMs() {
 
1166         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
 
1168         params = params.toBuilder().timeoutSec(null).build();
 
1170         // new params, thus need a new operation
 
1171         oper = new MyOper();
 
1173         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
 
1176     private void starter(OperationOutcome oper) {
 
1178         tstart = oper.getStart();
 
1183     private void completer(OperationOutcome oper) {
 
1190      * Gets a function that does nothing.
 
1192      * @param <T> type of input parameter expected by the function
 
1193      * @return a function that does nothing
 
1195     private <T> Consumer<T> noop() {
 
1200     private OperationOutcome makeSuccess() {
 
1201         OperationOutcome outcome = params.makeOutcome(null);
 
1202         outcome.setResult(PolicyResult.SUCCESS);
 
1207     private OperationOutcome makeFailure() {
 
1208         OperationOutcome outcome = params.makeOutcome(null);
 
1209         outcome.setResult(PolicyResult.FAILURE);
 
1217      * @param testName test name
 
1218      * @param expectedCallbacks number of callbacks expected
 
1219      * @param expectedOperations number of operation invocations expected
 
1220      * @param expectedResult expected outcome
 
1222     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1223                     PolicyResult expectedResult) {
 
1225         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
 
1231      * @param testName test name
 
1232      * @param expectedCallbacks number of callbacks expected
 
1233      * @param expectedOperations number of operation invocations expected
 
1234      * @param expectedResult expected outcome
 
1235      * @param manipulator function to modify the future returned by
 
1236      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
 
1237      *        in the executor are run
 
1239     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
 
1240                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
 
1248         CompletableFuture<OperationOutcome> future = oper.start();
 
1250         manipulator.accept(future);
 
1252         assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
1254         assertEquals(testName, expectedCallbacks, numStart);
 
1255         assertEquals(testName, expectedCallbacks, numEnd);
 
1257         if (expectedCallbacks > 0) {
 
1258             assertNotNull(testName, opstart);
 
1259             assertNotNull(testName, opend);
 
1260             assertEquals(testName, expectedResult, opend.getResult());
 
1262             assertSame(testName, tstart, opstart.getStart());
 
1263             assertSame(testName, tstart, opend.getStart());
 
1266                 assertTrue(future.isDone());
 
1267                 assertEquals(testName, opend, future.get());
 
1269                 // "start" is never final
 
1270                 for (OperationOutcome outcome : starts) {
 
1271                     assertFalse(testName, outcome.isFinalOutcome());
 
1274                 // only the last "complete" is final
 
1275                 assertTrue(testName, ends.removeLast().isFinalOutcome());
 
1277                 for (OperationOutcome outcome : ends) {
 
1278                     assertFalse(outcome.isFinalOutcome());
 
1281             } catch (InterruptedException | ExecutionException e) {
 
1282                 throw new IllegalStateException(e);
 
1285             if (expectedOperations > 0) {
 
1286                 assertNotNull(testName, oper.getSubRequestId());
 
1287                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
 
1288                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
 
1292         assertEquals(testName, expectedOperations, oper.getCount());
 
1296      * Creates a new {@link #oper} whose coder will throw an exception.
 
1298     private void setOperCoderException() {
 
1299         oper = new MyOper() {
 
1301             protected Coder getCoder() {
 
1302                 return new StandardCoder() {
 
1304                     public String encode(Object object, boolean pretty) throws CoderException {
 
1305                         throw new CoderException(EXPECTED_EXCEPTION);
 
1314     public static class MyData {
 
1315         private String text = TEXT;
 
1319     private class MyOper extends OperationPartial {
 
1321         private int count = 0;
 
1324         private boolean genException;
 
1326         private int maxFailures = 0;
 
1328         private CompletableFuture<OperationOutcome> preProc;
 
1332             super(OperationPartialTest.this.params, config, PROP_NAMES);
 
1336         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
1339                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1342             operation.setSubRequestId(String.valueOf(attempt));
 
1344             if (count > maxFailures) {
 
1345                 operation.setResult(PolicyResult.SUCCESS);
 
1347                 operation.setResult(PolicyResult.FAILURE);
 
1354         protected long getRetryWaitMs() {
 
1356              * Sleep timers run in the background, but we want to control things via the
 
1357              * "executor", thus we avoid sleep timers altogether by simply returning 0.
 
1363         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
1364             return (preProc != null ? preProc : super.startPreprocessorAsync());