2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
  24 import static org.junit.Assert.assertEquals;
 
  25 import static org.junit.Assert.assertFalse;
 
  26 import static org.junit.Assert.assertNotNull;
 
  27 import static org.junit.Assert.assertNull;
 
  28 import static org.junit.Assert.assertSame;
 
  29 import static org.junit.Assert.assertTrue;
 
  30 import static org.mockito.Mockito.never;
 
  31 import static org.mockito.Mockito.spy;
 
  32 import static org.mockito.Mockito.times;
 
  33 import static org.mockito.Mockito.verify;
 
  35 import java.time.Instant;
 
  36 import java.util.Arrays;
 
  37 import java.util.LinkedList;
 
  38 import java.util.List;
 
  40 import java.util.Map.Entry;
 
  41 import java.util.Queue;
 
  42 import java.util.TreeMap;
 
  43 import java.util.UUID;
 
  44 import java.util.concurrent.CompletableFuture;
 
  45 import java.util.concurrent.CompletionException;
 
  46 import java.util.concurrent.CountDownLatch;
 
  47 import java.util.concurrent.ExecutionException;
 
  48 import java.util.concurrent.Executor;
 
  49 import java.util.concurrent.ForkJoinPool;
 
  50 import java.util.concurrent.Future;
 
  51 import java.util.concurrent.TimeUnit;
 
  52 import java.util.concurrent.TimeoutException;
 
  53 import java.util.concurrent.atomic.AtomicBoolean;
 
  54 import java.util.concurrent.atomic.AtomicInteger;
 
  55 import java.util.concurrent.atomic.AtomicReference;
 
  56 import java.util.function.Consumer;
 
  57 import java.util.function.Function;
 
  58 import java.util.stream.Collectors;
 
  61 import org.junit.Before;
 
  62 import org.junit.Test;
 
  63 import org.onap.policy.controlloop.ControlLoopOperation;
 
  64 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 
  65 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  66 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 
  67 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  68 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  69 import org.onap.policy.controlloop.policy.PolicyResult;
 
  71 public class OperatorPartialTest {
 
  72     private static final int MAX_PARALLEL_REQUESTS = 10;
 
  73     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  74     private static final String ACTOR = "my-actor";
 
  75     private static final String OPERATOR = "my-operator";
 
  76     private static final String TARGET = "my-target";
 
  77     private static final int TIMEOUT = 1000;
 
  78     private static final UUID REQ_ID = UUID.randomUUID();
 
  80     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
 
  81                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
  83     private VirtualControlLoopEvent event;
 
  84     private Map<String, Object> config;
 
  85     private ControlLoopEventContext context;
 
  86     private MyExec executor;
 
  87     private ControlLoopOperationParams params;
 
  94     private Instant tstart;
 
  96     private OperationOutcome opstart;
 
  97     private OperationOutcome opend;
 
 100      * Initializes the fields, including {@link #oper}.
 
 103     public void setUp() {
 
 104         event = new VirtualControlLoopEvent();
 
 105         event.setRequestId(REQ_ID);
 
 107         config = new TreeMap<>();
 
 108         context = new ControlLoopEventContext(event);
 
 109         executor = new MyExec();
 
 111         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
 
 112                         .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT)
 
 113                         .startCallback(this::starter).targetEntity(TARGET).build();
 
 116         oper.configure(new TreeMap<>());
 
 126     public void testOperatorPartial_testGetActorName_testGetName() {
 
 127         assertEquals(ACTOR, oper.getActorName());
 
 128         assertEquals(OPERATOR, oper.getName());
 
 129         assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
 
 133     public void testGetBlockingExecutor() throws InterruptedException {
 
 134         CountDownLatch latch = new CountDownLatch(1);
 
 137          * Use an operator that doesn't override getBlockingExecutor().
 
 139         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
 
 140         oper2.getBlockingExecutor().execute(() -> latch.countDown());
 
 142         assertTrue(latch.await(5, TimeUnit.SECONDS));
 
 146     public void testDoConfigure() {
 
 147         oper = spy(new MyOper());
 
 149         oper.configure(config);
 
 150         verify(oper).configure(config);
 
 152         // repeat - SHOULD be run again
 
 153         oper.configure(config);
 
 154         verify(oper, times(2)).configure(config);
 
 158     public void testDoStart() {
 
 159         oper = spy(new MyOper());
 
 161         oper.configure(config);
 
 164         verify(oper).doStart();
 
 166         // others should not have been invoked
 
 167         verify(oper, never()).doStop();
 
 168         verify(oper, never()).doShutdown();
 
 172     public void testDoStop() {
 
 173         oper = spy(new MyOper());
 
 175         oper.configure(config);
 
 179         verify(oper).doStop();
 
 181         // should not have been re-invoked
 
 182         verify(oper).doStart();
 
 184         // others should not have been invoked
 
 185         verify(oper, never()).doShutdown();
 
 189     public void testDoShutdown() {
 
 190         oper = spy(new MyOper());
 
 192         oper.configure(config);
 
 196         verify(oper).doShutdown();
 
 198         // should not have been re-invoked
 
 199         verify(oper).doStart();
 
 201         // others should not have been invoked
 
 202         verify(oper, never()).doStop();
 
 206     public void testStartOperation() {
 
 207         verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
 
 211      * Tests startOperation() when the operator is not running.
 
 214     public void testStartOperationNotRunning() {
 
 215         // use a new operator, one that hasn't been started yet
 
 217         oper.configure(new TreeMap<>());
 
 219         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
 
 223      * Tests startOperation() when the operation has a preprocessor.
 
 226     public void testStartOperationWithPreprocessor() {
 
 227         AtomicInteger count = new AtomicInteger();
 
 229         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
 
 230             count.incrementAndGet();
 
 231             return makeSuccess();
 
 234         oper.setPreProcessor(preproc);
 
 236         verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
 
 238         assertEquals(1, count.get());
 
 242      * Tests startOperation() with multiple running requests.
 
 245     public void testStartOperationMultiple() {
 
 246         for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
 
 247             oper.startOperation(params);
 
 250         assertTrue(executor.runAll());
 
 252         assertNotNull(opstart);
 
 253         assertNotNull(opend);
 
 254         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
 256         assertEquals(MAX_PARALLEL_REQUESTS, numStart);
 
 257         assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
 
 258         assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
 
 262      * Tests startPreprocessor() when the preprocessor returns a failure.
 
 265     public void testStartPreprocessorFailure() {
 
 266         oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
 
 268         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
 
 272      * Tests startPreprocessor() when the preprocessor throws an exception.
 
 275     public void testStartPreprocessorException() {
 
 276         // arrange for the preprocessor to throw an exception
 
 277         oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
 
 279         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
 
 283      * Tests startPreprocessor() when the pipeline is not running.
 
 286     public void testStartPreprocessorNotRunning() {
 
 287         // arrange for the preprocessor to return success, which will be ignored
 
 288         oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
 
 290         oper.startOperation(params).cancel(false);
 
 291         assertTrue(executor.runAll());
 
 296         assertEquals(0, numStart);
 
 297         assertEquals(0, oper.getCount());
 
 298         assertEquals(0, numEnd);
 
 302      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
 
 305     public void testStartPreprocessorBuilderException() {
 
 306         oper = new MyOper() {
 
 308             protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
 
 309                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 313         oper.configure(new TreeMap<>());
 
 316         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
 
 318         // should be nothing in the queue
 
 319         assertEquals(0, executor.getQueueLength());
 
 323     public void testStartPreprocessorAsync() {
 
 324         assertNull(oper.startPreprocessorAsync(params));
 
 328     public void testStartOperationAsync() {
 
 329         oper.startOperation(params);
 
 330         assertTrue(executor.runAll());
 
 332         assertEquals(1, oper.getCount());
 
 336     public void testIsSuccess() {
 
 337         OperationOutcome outcome = new OperationOutcome();
 
 339         outcome.setResult(PolicyResult.SUCCESS);
 
 340         assertTrue(oper.isSuccess(outcome));
 
 342         for (PolicyResult failure : FAILURE_RESULTS) {
 
 343             outcome.setResult(failure);
 
 344             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
 
 349     public void testIsActorFailed() {
 
 350         assertFalse(oper.isActorFailed(null));
 
 352         OperationOutcome outcome = params.makeOutcome();
 
 355         outcome.setResult(PolicyResult.SUCCESS);
 
 356         assertFalse(oper.isActorFailed(outcome));
 
 358         outcome.setResult(PolicyResult.FAILURE_RETRIES);
 
 359         assertFalse(oper.isActorFailed(outcome));
 
 362         outcome.setResult(PolicyResult.FAILURE);
 
 365         outcome.setActor(TARGET);
 
 366         assertFalse(oper.isActorFailed(outcome));
 
 367         outcome.setActor(null);
 
 368         assertFalse(oper.isActorFailed(outcome));
 
 369         outcome.setActor(ACTOR);
 
 371         // incorrect operation
 
 372         outcome.setOperation(TARGET);
 
 373         assertFalse(oper.isActorFailed(outcome));
 
 374         outcome.setOperation(null);
 
 375         assertFalse(oper.isActorFailed(outcome));
 
 376         outcome.setOperation(OPERATOR);
 
 379         assertTrue(oper.isActorFailed(outcome));
 
 383     public void testDoOperation() {
 
 385          * Use an operator that doesn't override doOperation().
 
 387         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {
 
 389             protected Executor getBlockingExecutor() {
 
 394         oper2.configure(new TreeMap<>());
 
 397         oper2.startOperation(params);
 
 398         assertTrue(executor.runAll());
 
 400         assertNotNull(opend);
 
 401         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
 
 405     public void testTimeout() throws Exception {
 
 407         // use a real executor
 
 408         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 410         // trigger timeout very quickly
 
 411         oper = new MyOper() {
 
 413             protected long getTimeOutMillis(Integer timeoutSec) {
 
 418             protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params,
 
 419                             int attempt, OperationOutcome outcome) {
 
 421                 OperationOutcome outcome2 = params.makeOutcome();
 
 422                 outcome2.setResult(PolicyResult.SUCCESS);
 
 425                  * Create an incomplete future that will timeout after the operation's
 
 426                  * timeout. If it fires before the other timer, then it will return a
 
 429                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 430                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
 
 431                                 params.getExecutor());
 
 437         oper.configure(new TreeMap<>());
 
 440         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult());
 
 444      * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
 
 445      * operation once the preprocessor completes.
 
 448     public void testTimeoutInPreprocessor() throws Exception {
 
 450         // use a real executor
 
 451         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 453         // trigger timeout very quickly
 
 454         oper = new MyOper() {
 
 456             protected long getTimeOutMillis(Integer timeoutSec) {
 
 461             protected Executor getBlockingExecutor() {
 
 463                     Thread thread = new Thread(command);
 
 469             protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
 
 471                 OperationOutcome outcome = makeSuccess();
 
 474                  * Create an incomplete future that will timeout after the operation's
 
 475                  * timeout. If it fires before the other timer, then it will return a
 
 478                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 479                 future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
 
 480                                 params.getExecutor());
 
 486         oper.configure(new TreeMap<>());
 
 489         OperationOutcome result = oper.startOperation(params).get();
 
 490         assertEquals(PolicyResult.SUCCESS, result.getResult());
 
 492         assertNotNull(opstart);
 
 493         assertNotNull(opend);
 
 494         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
 496         assertEquals(1, numStart);
 
 497         assertEquals(1, oper.getCount());
 
 498         assertEquals(1, numEnd);
 
 502      * Tests retry functions, when the count is set to zero and retries are exhausted.
 
 505     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
 
 506         params = params.toBuilder().retry(0).build();
 
 507         oper.setMaxFailures(10);
 
 509         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
 
 513      * Tests retry functions, when the count is null and retries are exhausted.
 
 516     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
 
 517         params = params.toBuilder().retry(null).build();
 
 518         oper.setMaxFailures(10);
 
 520         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
 
 524      * Tests retry functions, when retries are exhausted.
 
 527     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
 
 528         final int maxRetries = 3;
 
 529         params = params.toBuilder().retry(maxRetries).build();
 
 530         oper.setMaxFailures(10);
 
 532         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
 
 533                         PolicyResult.FAILURE_RETRIES);
 
 537      * Tests retry functions, when a success follows some retries.
 
 540     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
 
 541         params = params.toBuilder().retry(10).build();
 
 543         final int maxFailures = 3;
 
 544         oper.setMaxFailures(maxFailures);
 
 546         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
 
 547                         PolicyResult.SUCCESS);
 
 551      * Tests retry functions, when the outcome is {@code null}.
 
 554     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
 
 556         // arrange to return null from doOperation()
 
 557         oper = new MyOper() {
 
 559             protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
 
 560                             OperationOutcome operation) {
 
 563                 super.doOperation(params, attempt, operation);
 
 568         oper.configure(new TreeMap<>());
 
 571         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
 
 575     public void testIsSameOperation() {
 
 576         assertFalse(oper.isSameOperation(null));
 
 578         OperationOutcome outcome = params.makeOutcome();
 
 580         // wrong actor - should be false
 
 581         outcome.setActor(null);
 
 582         assertFalse(oper.isSameOperation(outcome));
 
 583         outcome.setActor(TARGET);
 
 584         assertFalse(oper.isSameOperation(outcome));
 
 585         outcome.setActor(ACTOR);
 
 587         // wrong operation - should be null
 
 588         outcome.setOperation(null);
 
 589         assertFalse(oper.isSameOperation(outcome));
 
 590         outcome.setOperation(TARGET);
 
 591         assertFalse(oper.isSameOperation(outcome));
 
 592         outcome.setOperation(OPERATOR);
 
 594         assertTrue(oper.isSameOperation(outcome));
 
 598      * Tests handleFailure() when the outcome is a success.
 
 601     public void testHandlePreprocessorFailureTrue() {
 
 602         oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
 
 603         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
 
 607      * Tests handleFailure() when the outcome is <i>not</i> a success.
 
 610     public void testHandlePreprocessorFailureFalse() throws Exception {
 
 611         oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
 
 612         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
 
 616      * Tests handleFailure() when the outcome is {@code null}.
 
 619     public void testHandlePreprocessorFailureNull() throws Exception {
 
 620         // arrange to return null from the preprocessor
 
 621         oper.setPreProcessor(CompletableFuture.completedFuture(null));
 
 623         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
 
 627     public void testFromException() {
 
 628         // arrange to generate an exception when operation runs
 
 629         oper.setGenException(true);
 
 631         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
 
 635      * Tests fromException() when there is no exception.
 
 638     public void testFromExceptionNoExcept() {
 
 639         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
 
 643      * Tests both flavors of anyOf(), because one invokes the other.
 
 646     public void testAnyOf() throws Exception {
 
 647         // first task completes, others do not
 
 648         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
 
 650         final OperationOutcome outcome = params.makeOutcome();
 
 652         tasks.add(CompletableFuture.completedFuture(outcome));
 
 653         tasks.add(new CompletableFuture<>());
 
 654         tasks.add(new CompletableFuture<>());
 
 656         CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks);
 
 657         assertTrue(executor.runAll());
 
 659         assertTrue(result.isDone());
 
 660         assertSame(outcome, result.get());
 
 662         // second task completes, others do not
 
 663         tasks = new LinkedList<>();
 
 665         tasks.add(new CompletableFuture<>());
 
 666         tasks.add(CompletableFuture.completedFuture(outcome));
 
 667         tasks.add(new CompletableFuture<>());
 
 669         result = oper.anyOf(params, tasks);
 
 670         assertTrue(executor.runAll());
 
 672         assertTrue(result.isDone());
 
 673         assertSame(outcome, result.get());
 
 675         // third task completes, others do not
 
 676         tasks = new LinkedList<>();
 
 678         tasks.add(new CompletableFuture<>());
 
 679         tasks.add(new CompletableFuture<>());
 
 680         tasks.add(CompletableFuture.completedFuture(outcome));
 
 682         result = oper.anyOf(params, tasks);
 
 683         assertTrue(executor.runAll());
 
 685         assertTrue(result.isDone());
 
 686         assertSame(outcome, result.get());
 
 690      * Tests both flavors of allOf(), because one invokes the other.
 
 693     public void testAllOf() throws Exception {
 
 694         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
 
 696         final OperationOutcome outcome = params.makeOutcome();
 
 698         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 699         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 700         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 706         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
 
 708         assertTrue(executor.runAll());
 
 709         assertFalse(result.isDone());
 
 710         future1.complete(outcome);
 
 712         // complete 3 before 2
 
 713         assertTrue(executor.runAll());
 
 714         assertFalse(result.isDone());
 
 715         future3.complete(outcome);
 
 717         assertTrue(executor.runAll());
 
 718         assertFalse(result.isDone());
 
 719         future2.complete(outcome);
 
 721         // all of them are now done
 
 722         assertTrue(executor.runAll());
 
 723         assertTrue(result.isDone());
 
 724         assertSame(outcome, result.get());
 
 728     public void testCombineOutcomes() throws Exception {
 
 730         verifyOutcomes(0, PolicyResult.SUCCESS);
 
 731         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
 
 733         // maximum is in different positions
 
 734         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
 
 735         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
 
 736         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
 739         final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
 
 740         tasks.add(CompletableFuture.completedFuture(null));
 
 741         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
 
 743         assertTrue(executor.runAll());
 
 744         assertTrue(result.isDone());
 
 745         assertNull(result.get());
 
 747         // one throws an exception during execution
 
 748         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
 751         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
 
 752         tasks.add(CompletableFuture.failedFuture(except));
 
 753         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
 
 754         result = oper.allOf(params, tasks);
 
 756         assertTrue(executor.runAll());
 
 757         assertTrue(result.isCompletedExceptionally());
 
 758         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
 
 761     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
 
 762         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
 
 765         OperationOutcome expectedOutcome = null;
 
 767         for (int count = 0; count < results.length; ++count) {
 
 768             OperationOutcome outcome = params.makeOutcome();
 
 769             outcome.setResult(results[count]);
 
 770             tasks.add(CompletableFuture.completedFuture(outcome));
 
 772             if (count == expected) {
 
 773                 expectedOutcome = outcome;
 
 777         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
 
 779         assertTrue(executor.runAll());
 
 780         assertTrue(result.isDone());
 
 781         assertSame(expectedOutcome, result.get());
 
 784     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
 
 785                     final OperationOutcome taskOutcome) {
 
 787         return outcome -> CompletableFuture.completedFuture(taskOutcome);
 
 791     public void testDetmPriority() {
 
 792         assertEquals(1, oper.detmPriority(null));
 
 794         OperationOutcome outcome = params.makeOutcome();
 
 796         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
 
 797                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
 
 798                         PolicyResult.FAILURE_EXCEPTION, 6);
 
 800         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
 
 801             outcome.setResult(ent.getKey());
 
 802             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
 
 807      * Tests doTask(Future) when the controller is not running.
 
 810     public void testDoTaskFutureNotRunning() throws Exception {
 
 811         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
 
 813         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 814         controller.complete(params.makeOutcome());
 
 816         CompletableFuture<OperationOutcome> future =
 
 817                         oper.doTask(params, controller, false, params.makeOutcome(), taskFuture);
 
 818         assertFalse(future.isDone());
 
 819         assertTrue(executor.runAll());
 
 821         // should not have run the task
 
 822         assertFalse(future.isDone());
 
 824         // should have canceled the task future
 
 825         assertTrue(taskFuture.isCancelled());
 
 829      * Tests doTask(Future) when the previous outcome was successful.
 
 832     public void testDoTaskFutureSuccess() throws Exception {
 
 833         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
 
 834         final OperationOutcome taskOutcome = params.makeOutcome();
 
 836         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 838         CompletableFuture<OperationOutcome> future =
 
 839                         oper.doTask(params, controller, true, params.makeOutcome(), taskFuture);
 
 841         taskFuture.complete(taskOutcome);
 
 842         assertTrue(executor.runAll());
 
 844         assertTrue(future.isDone());
 
 845         assertSame(taskOutcome, future.get());
 
 847         // controller should not be done yet
 
 848         assertFalse(controller.isDone());
 
 852      * Tests doTask(Future) when the previous outcome was failed.
 
 855     public void testDoTaskFutureFailure() throws Exception {
 
 856         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
 
 857         final OperationOutcome failedOutcome = params.makeOutcome();
 
 858         failedOutcome.setResult(PolicyResult.FAILURE);
 
 860         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 862         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture);
 
 863         assertFalse(future.isDone());
 
 864         assertTrue(executor.runAll());
 
 866         // should not have run the task
 
 867         assertFalse(future.isDone());
 
 869         // should have canceled the task future
 
 870         assertTrue(taskFuture.isCancelled());
 
 872         // controller SHOULD be done now
 
 873         assertTrue(controller.isDone());
 
 874         assertSame(failedOutcome, controller.get());
 
 878      * Tests doTask(Future) when the previous outcome was failed, but not checking
 
 882     public void testDoTaskFutureUncheckedFailure() throws Exception {
 
 883         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
 
 884         final OperationOutcome failedOutcome = params.makeOutcome();
 
 885         failedOutcome.setResult(PolicyResult.FAILURE);
 
 887         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 889         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture);
 
 890         assertFalse(future.isDone());
 
 893         OperationOutcome taskOutcome = params.makeOutcome();
 
 894         taskFuture.complete(taskOutcome);
 
 896         assertTrue(executor.runAll());
 
 898         // should have run the task
 
 899         assertTrue(future.isDone());
 
 901         assertTrue(future.isDone());
 
 902         assertSame(taskOutcome, future.get());
 
 904         // controller should not be done yet
 
 905         assertFalse(controller.isDone());
 
 909      * Tests doTask(Function) when the controller is not running.
 
 912     public void testDoTaskFunctionNotRunning() throws Exception {
 
 913         AtomicBoolean invoked = new AtomicBoolean();
 
 915         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
 
 917             return CompletableFuture.completedFuture(params.makeOutcome());
 
 920         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 921         controller.complete(params.makeOutcome());
 
 923         CompletableFuture<OperationOutcome> future =
 
 924                         oper.doTask(params, controller, false, task).apply(params.makeOutcome());
 
 925         assertFalse(future.isDone());
 
 926         assertTrue(executor.runAll());
 
 928         // should not have run the task
 
 929         assertFalse(future.isDone());
 
 931         // should not have even invoked the task
 
 932         assertFalse(invoked.get());
 
 936      * Tests doTask(Function) when the previous outcome was successful.
 
 939     public void testDoTaskFunctionSuccess() throws Exception {
 
 940         final OperationOutcome taskOutcome = params.makeOutcome();
 
 942         final OperationOutcome failedOutcome = params.makeOutcome();
 
 944         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
 
 946         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 948         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
 
 950         assertTrue(future.isDone());
 
 951         assertSame(taskOutcome, future.get());
 
 953         // controller should not be done yet
 
 954         assertFalse(controller.isDone());
 
 958      * Tests doTask(Function) when the previous outcome was failed.
 
 961     public void testDoTaskFunctionFailure() throws Exception {
 
 962         final OperationOutcome failedOutcome = params.makeOutcome();
 
 963         failedOutcome.setResult(PolicyResult.FAILURE);
 
 965         AtomicBoolean invoked = new AtomicBoolean();
 
 967         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
 
 969             return CompletableFuture.completedFuture(params.makeOutcome());
 
 972         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 974         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
 
 975         assertFalse(future.isDone());
 
 976         assertTrue(executor.runAll());
 
 978         // should not have run the task
 
 979         assertFalse(future.isDone());
 
 981         // should not have even invoked the task
 
 982         assertFalse(invoked.get());
 
 984         // controller should have the failed task
 
 985         assertTrue(controller.isDone());
 
 986         assertSame(failedOutcome, controller.get());
 
 990      * Tests doTask(Function) when the previous outcome was failed, but not checking
 
 994     public void testDoTaskFunctionUncheckedFailure() throws Exception {
 
 995         final OperationOutcome taskOutcome = params.makeOutcome();
 
 997         final OperationOutcome failedOutcome = params.makeOutcome();
 
 998         failedOutcome.setResult(PolicyResult.FAILURE);
 
1000         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
 
1002         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
1004         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome);
 
1006         assertTrue(future.isDone());
 
1007         assertSame(taskOutcome, future.get());
 
1009         // controller should not be done yet
 
1010         assertFalse(controller.isDone());
 
1014      * Tests callbackStarted() when the pipeline has already been stopped.
 
1017     public void testCallbackStartedNotRunning() {
 
1018         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
1021          * arrange to stop the controller when the start-callback is invoked, but capture
 
1024         params = params.toBuilder().startCallback(oper -> {
 
1026             future.get().cancel(false);
 
1029         future.set(oper.startOperation(params));
 
1030         assertTrue(executor.runAll());
 
1032         // should have only run once
 
1033         assertEquals(1, numStart);
 
1037      * Tests callbackCompleted() when the pipeline has already been stopped.
 
1040     public void testCallbackCompletedNotRunning() {
 
1041         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
1043         // arrange to stop the controller when the start-callback is invoked
 
1044         params = params.toBuilder().startCallback(oper -> {
 
1045             future.get().cancel(false);
 
1048         future.set(oper.startOperation(params));
 
1049         assertTrue(executor.runAll());
 
1051         // should not have been set
 
1053         assertEquals(0, numEnd);
 
1057     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
 
1058         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
 
1060         OperationOutcome outcome;
 
1062         outcome = new OperationOutcome();
 
1063         oper.setOutcome(params, outcome, timex);
 
1064         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1065         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
 
1067         outcome = new OperationOutcome();
 
1068         oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION));
 
1069         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1070         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
 
1074     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
 
1075         OperationOutcome outcome;
 
1077         outcome = new OperationOutcome();
 
1078         oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
 
1079         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1080         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1082         for (PolicyResult result : FAILURE_RESULTS) {
 
1083             outcome = new OperationOutcome();
 
1084             oper.setOutcome(params, outcome, result);
 
1085             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1086             assertEquals(result.toString(), result, outcome.getResult());
 
1091     public void testIsTimeout() {
 
1092         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
 
1094         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
 
1095         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
 
1096         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
 
1097         assertFalse(oper.isTimeout(new CompletionException(null)));
 
1098         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
 
1100         assertTrue(oper.isTimeout(timex));
 
1101         assertTrue(oper.isTimeout(new CompletionException(timex)));
 
1105     public void testGetTimeOutMillis() {
 
1106         assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec()));
 
1108         params = params.toBuilder().timeoutSec(null).build();
 
1109         assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec()));
 
1112     private void starter(OperationOutcome oper) {
 
1114         tstart = oper.getStart();
 
1118     private void completer(OperationOutcome oper) {
 
1124      * Gets a function that does nothing.
 
1126      * @param <T> type of input parameter expected by the function
 
1127      * @return a function that does nothing
 
1129     private <T> Consumer<T> noop() {
 
1134     private OperationOutcome makeSuccess() {
 
1135         OperationOutcome outcome = params.makeOutcome();
 
1136         outcome.setResult(PolicyResult.SUCCESS);
 
1141     private OperationOutcome makeFailure() {
 
1142         OperationOutcome outcome = params.makeOutcome();
 
1143         outcome.setResult(PolicyResult.FAILURE);
 
1151      * @param testName test name
 
1152      * @param expectedCallbacks number of callbacks expected
 
1153      * @param expectedOperations number of operation invocations expected
 
1154      * @param expectedResult expected outcome
 
1156     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1157                     PolicyResult expectedResult) {
 
1159         String expectedSubRequestId =
 
1160                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
 
1162         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
 
1168      * @param testName test name
 
1169      * @param expectedCallbacks number of callbacks expected
 
1170      * @param expectedOperations number of operation invocations expected
 
1171      * @param expectedResult expected outcome
 
1172      * @param expectedSubRequestId expected sub request ID
 
1173      * @param manipulator function to modify the future returned by
 
1174      *        {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
 
1175      *        the tasks in the executor are run
 
1177     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
 
1178                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
 
1180         CompletableFuture<OperationOutcome> future = oper.startOperation(params);
 
1182         manipulator.accept(future);
 
1184         assertTrue(testName, executor.runAll());
 
1186         assertEquals(testName, expectedCallbacks, numStart);
 
1187         assertEquals(testName, expectedCallbacks, numEnd);
 
1189         if (expectedCallbacks > 0) {
 
1190             assertNotNull(testName, opstart);
 
1191             assertNotNull(testName, opend);
 
1192             assertEquals(testName, expectedResult, opend.getResult());
 
1194             assertSame(testName, tstart, opstart.getStart());
 
1195             assertSame(testName, tstart, opend.getStart());
 
1198                 assertTrue(future.isDone());
 
1199                 assertSame(testName, opend, future.get());
 
1201             } catch (InterruptedException | ExecutionException e) {
 
1202                 throw new IllegalStateException(e);
 
1205             if (expectedOperations > 0) {
 
1206                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
 
1210         assertEquals(testName, expectedOperations, oper.getCount());
 
1213     private class MyOper extends OperatorPartial {
 
1215         private int count = 0;
 
1218         private boolean genException;
 
1221         private int maxFailures = 0;
 
1224         private CompletableFuture<OperationOutcome> preProcessor;
 
1227             super(ACTOR, OPERATOR);
 
1231         protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
 
1232                         OperationOutcome operation) {
 
1235                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1238             operation.setSubRequestId(String.valueOf(attempt));
 
1240             if (count > maxFailures) {
 
1241                 operation.setResult(PolicyResult.SUCCESS);
 
1243                 operation.setResult(PolicyResult.FAILURE);
 
1250         protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
 
1251             return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params));
 
1255         protected Executor getBlockingExecutor() {
 
1261      * Executor that will run tasks until the queue is empty or a maximum number of tasks
 
1262      * have been executed.
 
1264     private static class MyExec implements Executor {
 
1265         private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
 
1267         private Queue<Runnable> commands = new LinkedList<>();
 
1273         public int getQueueLength() {
 
1274             return commands.size();
 
1278         public void execute(Runnable command) {
 
1279             commands.add(command);
 
1282         public boolean runAll() {
 
1283             for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
 
1284                 commands.remove().run();
 
1287             return commands.isEmpty();