2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertFalse;
 
  27 import static org.junit.Assert.assertNotNull;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertSame;
 
  30 import static org.junit.Assert.assertTrue;
 
  31 import static org.mockito.ArgumentMatchers.any;
 
  32 import static org.mockito.Mockito.when;
 
  34 import ch.qos.logback.classic.Logger;
 
  35 import java.time.Instant;
 
  36 import java.util.ArrayDeque;
 
  37 import java.util.Arrays;
 
  38 import java.util.Collections;
 
  39 import java.util.Deque;
 
  40 import java.util.LinkedList;
 
  41 import java.util.List;
 
  43 import java.util.Map.Entry;
 
  44 import java.util.UUID;
 
  45 import java.util.concurrent.CompletableFuture;
 
  46 import java.util.concurrent.CompletionException;
 
  47 import java.util.concurrent.ExecutionException;
 
  48 import java.util.concurrent.ForkJoinPool;
 
  49 import java.util.concurrent.Future;
 
  50 import java.util.concurrent.TimeUnit;
 
  51 import java.util.concurrent.TimeoutException;
 
  52 import java.util.concurrent.atomic.AtomicReference;
 
  53 import java.util.function.Consumer;
 
  54 import java.util.function.Supplier;
 
  55 import java.util.stream.Collectors;
 
  58 import org.junit.AfterClass;
 
  59 import org.junit.Before;
 
  60 import org.junit.BeforeClass;
 
  61 import org.junit.Test;
 
  62 import org.mockito.Mock;
 
  63 import org.mockito.MockitoAnnotations;
 
  64 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  65 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  66 import org.onap.policy.common.utils.coder.Coder;
 
  67 import org.onap.policy.common.utils.coder.CoderException;
 
  68 import org.onap.policy.common.utils.coder.StandardCoder;
 
  69 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 
  70 import org.onap.policy.common.utils.time.PseudoExecutor;
 
  71 import org.onap.policy.controlloop.ControlLoopOperation;
 
  72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 
  73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  75 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
 
  76 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
 
  77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 
  78 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  79 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
 
  80 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 
  81 import org.slf4j.LoggerFactory;
 
  83 public class OperationPartialTest {
 
  84     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
 
  85     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
 
  86     private static final int MAX_REQUESTS = 100;
 
  87     private static final int MAX_PARALLEL = 10;
 
  88     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  89     private static final String ACTOR = "my-actor";
 
  90     private static final String OPERATION = "my-operation";
 
  91     private static final String MY_SINK = "my-sink";
 
  92     private static final String MY_SOURCE = "my-source";
 
  93     private static final String MY_TARGET_ENTITY = "my-entity";
 
  94     private static final String TEXT = "my-text";
 
  95     private static final int TIMEOUT = 1000;
 
  96     private static final UUID REQ_ID = UUID.randomUUID();
 
  98     private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
 
  99                     .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
 
 102      * Used to attach an appender to the class' logger.
 
 104     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
 
 105     private static final ExtractAppender appender = new ExtractAppender();
 
 107     private static final List<String> PROP_NAMES = List.of("hello", "world");
 
 110     private ActorService service;
 
 112     private Actor guardActor;
 
 114     private Operator guardOperator;
 
 116     private Operation guardOperation;
 
 118     private PseudoExecutor executor;
 
 119     private ControlLoopOperationParams params;
 
 123     private int numStart;
 
 126     private Instant tstart;
 
 128     private OperationOutcome opstart;
 
 129     private OperationOutcome opend;
 
 131     private Deque<OperationOutcome> starts;
 
 132     private Deque<OperationOutcome> ends;
 
 134     private OperatorConfig config;
 
 137      * Attaches the appender to the logger.
 
 140     public static void setUpBeforeClass() throws Exception {
 
 142          * Attach appender to the logger.
 
 144         appender.setContext(logger.getLoggerContext());
 
 147         logger.addAppender(appender);
 
 151      * Stops the appender.
 
 154     public static void tearDownAfterClass() {
 
 159      * Initializes the fields, including {@link #oper}.
 
 162     public void setUp() {
 
 163         MockitoAnnotations.initMocks(this);
 
 164         executor = new PseudoExecutor();
 
 166         params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
 
 167                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
 
 168                         .startCallback(this::starter).build();
 
 170         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
 
 171         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
 
 172         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
 
 173         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
 
 175         config = new OperatorConfig(executor);
 
 184         starts = new ArrayDeque<>(10);
 
 185         ends = new ArrayDeque<>(10);
 
 189     public void testOperatorPartial_testGetActorName_testGetName() {
 
 190         assertEquals(ACTOR, oper.getActorName());
 
 191         assertEquals(OPERATION, oper.getName());
 
 192         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
 
 196     public void testGetBlockingThread() throws Exception {
 
 197         CompletableFuture<Void> future = new CompletableFuture<>();
 
 199         // use the real executor
 
 200         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
 
 202             public Operation buildOperation(ControlLoopOperationParams params) {
 
 207         oper2.getBlockingExecutor().execute(() -> future.complete(null));
 
 209         assertNull(future.get(5, TimeUnit.SECONDS));
 
 213     public void testGetPropertyNames() {
 
 214         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
 
 218     public void testGetProperty_testSetProperty_testGetRequiredProperty() {
 
 219         oper.setProperty("propertyA", "valueA");
 
 220         oper.setProperty("propertyB", "valueB");
 
 221         oper.setProperty("propertyC", 20);
 
 222         oper.setProperty("propertyD", "valueD");
 
 224         assertEquals("valueA", oper.getProperty("propertyA"));
 
 225         assertEquals("valueB", oper.getProperty("propertyB"));
 
 226         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
 
 228         assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
 
 230         assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
 
 231                         .withMessage("missing some type");
 
 235     public void testStart() {
 
 236         verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
 
 240      * Tests start() with multiple running requests.
 
 243     public void testStartMultiple() {
 
 244         for (int count = 0; count < MAX_PARALLEL; ++count) {
 
 248         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
 250         assertNotNull(opstart);
 
 251         assertNotNull(opend);
 
 252         assertEquals(OperationResult.SUCCESS, opend.getResult());
 
 254         assertEquals(MAX_PARALLEL, numStart);
 
 255         assertEquals(MAX_PARALLEL, oper.getCount());
 
 256         assertEquals(MAX_PARALLEL, numEnd);
 
 260     public void testStartOperationAsync() {
 
 262         assertTrue(executor.runAll(MAX_REQUESTS));
 
 264         assertEquals(1, oper.getCount());
 
 268     public void testIsSuccess() {
 
 269         assertFalse(oper.isSuccess(null));
 
 271         OperationOutcome outcome = new OperationOutcome();
 
 273         outcome.setResult(OperationResult.SUCCESS);
 
 274         assertTrue(oper.isSuccess(outcome));
 
 276         for (OperationResult failure : FAILURE_RESULTS) {
 
 277             outcome.setResult(failure);
 
 278             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
 
 283     public void testIsActorFailed() {
 
 284         assertFalse(oper.isActorFailed(null));
 
 286         OperationOutcome outcome = params.makeOutcome();
 
 289         outcome.setResult(OperationResult.SUCCESS);
 
 290         assertFalse(oper.isActorFailed(outcome));
 
 292         outcome.setResult(OperationResult.FAILURE_RETRIES);
 
 293         assertFalse(oper.isActorFailed(outcome));
 
 296         outcome.setResult(OperationResult.FAILURE);
 
 299         outcome.setActor(MY_SINK);
 
 300         assertFalse(oper.isActorFailed(outcome));
 
 301         outcome.setActor(null);
 
 302         assertFalse(oper.isActorFailed(outcome));
 
 303         outcome.setActor(ACTOR);
 
 305         // incorrect operation
 
 306         outcome.setOperation(MY_SINK);
 
 307         assertFalse(oper.isActorFailed(outcome));
 
 308         outcome.setOperation(null);
 
 309         assertFalse(oper.isActorFailed(outcome));
 
 310         outcome.setOperation(OPERATION);
 
 313         assertTrue(oper.isActorFailed(outcome));
 
 317     public void testDoOperation() {
 
 319          * Use an operation that doesn't override doOperation().
 
 321         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
 324         assertTrue(executor.runAll(MAX_REQUESTS));
 
 326         assertNotNull(opend);
 
 327         assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
 
 331     public void testTimeout() throws Exception {
 
 333         // use a real executor
 
 334         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 336         // trigger timeout very quickly
 
 337         oper = new MyOper() {
 
 339             protected long getTimeoutMs(Integer timeoutSec) {
 
 344             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 346                 OperationOutcome outcome2 = params.makeOutcome();
 
 347                 outcome2.setResult(OperationResult.SUCCESS);
 
 350                  * Create an incomplete future that will timeout after the operation's
 
 351                  * timeout. If it fires before the other timer, then it will return a
 
 354                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 355                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
 
 356                                 params.getExecutor());
 
 362         assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
 
 366      * Tests retry functions, when the count is set to zero and retries are exhausted.
 
 369     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
 
 370         params = params.toBuilder().retry(0).build();
 
 372         // new params, thus need a new operation
 
 375         oper.setMaxFailures(10);
 
 377         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
 
 381      * Tests retry functions, when the count is null and retries are exhausted.
 
 384     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
 
 385         params = params.toBuilder().retry(null).build();
 
 387         // new params, thus need a new operation
 
 390         oper.setMaxFailures(10);
 
 392         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
 
 396      * Tests retry functions, when retries are exhausted.
 
 399     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
 
 400         final int maxRetries = 3;
 
 401         params = params.toBuilder().retry(maxRetries).build();
 
 403         // new params, thus need a new operation
 
 406         oper.setMaxFailures(10);
 
 408         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
 
 409                         OperationResult.FAILURE_RETRIES);
 
 413      * Tests retry functions, when a success follows some retries.
 
 416     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
 
 417         params = params.toBuilder().retry(10).build();
 
 419         // new params, thus need a new operation
 
 422         final int maxFailures = 3;
 
 423         oper.setMaxFailures(maxFailures);
 
 425         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
 
 426                         OperationResult.SUCCESS);
 
 430      * Tests retry functions, when the outcome is {@code null}.
 
 433     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
 
 435         // arrange to return null from doOperation()
 
 436         oper = new MyOper() {
 
 438             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
 
 441                 super.doOperation(attempt, outcome);
 
 446         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
 
 450     public void testSleep() throws Exception {
 
 451         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
 
 452         assertTrue(future.isDone());
 
 453         assertNull(future.get());
 
 456         future = oper.sleep(0, TimeUnit.SECONDS);
 
 457         assertTrue(future.isDone());
 
 458         assertNull(future.get());
 
 461          * Start a second sleep we can use to check the first while it's running.
 
 463         tstart = Instant.now();
 
 464         future = oper.sleep(100, TimeUnit.MILLISECONDS);
 
 466         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
 
 468         // wait for second to complete and verify that the first has not completed
 
 470         assertFalse(future.isDone());
 
 472         // wait for second to complete
 
 475         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
 
 476         assertTrue(diff >= 99);
 
 480     public void testIsSameOperation() {
 
 481         assertFalse(oper.isSameOperation(null));
 
 483         OperationOutcome outcome = params.makeOutcome();
 
 485         // wrong actor - should be false
 
 486         outcome.setActor(null);
 
 487         assertFalse(oper.isSameOperation(outcome));
 
 488         outcome.setActor(MY_SINK);
 
 489         assertFalse(oper.isSameOperation(outcome));
 
 490         outcome.setActor(ACTOR);
 
 492         // wrong operation - should be null
 
 493         outcome.setOperation(null);
 
 494         assertFalse(oper.isSameOperation(outcome));
 
 495         outcome.setOperation(MY_SINK);
 
 496         assertFalse(oper.isSameOperation(outcome));
 
 497         outcome.setOperation(OPERATION);
 
 499         assertTrue(oper.isSameOperation(outcome));
 
 503     public void testFromException() {
 
 504         // arrange to generate an exception when operation runs
 
 505         oper.setGenException(true);
 
 507         verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
 
 511      * Tests fromException() when there is no exception.
 
 514     public void testFromExceptionNoExcept() {
 
 515         verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
 
 519      * Tests both flavors of anyOf(), because one invokes the other.
 
 522     public void testAnyOf() throws Exception {
 
 523         // first task completes, others do not
 
 524         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 526         final OperationOutcome outcome = params.makeOutcome();
 
 528         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 529         tasks.add(() -> new CompletableFuture<>());
 
 530         tasks.add(() -> null);
 
 531         tasks.add(() -> new CompletableFuture<>());
 
 533         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
 
 534         assertTrue(executor.runAll(MAX_REQUESTS));
 
 535         assertTrue(result.isDone());
 
 536         assertSame(outcome, result.get());
 
 538         // repeat using array form
 
 539         @SuppressWarnings("unchecked")
 
 540         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 541         result = oper.anyOf(tasks.toArray(taskArray));
 
 542         assertTrue(executor.runAll(MAX_REQUESTS));
 
 543         assertTrue(result.isDone());
 
 544         assertSame(outcome, result.get());
 
 546         // second task completes, others do not
 
 548         tasks.add(() -> new CompletableFuture<>());
 
 549         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 550         tasks.add(() -> new CompletableFuture<>());
 
 552         result = oper.anyOf(tasks);
 
 553         assertTrue(executor.runAll(MAX_REQUESTS));
 
 554         assertTrue(result.isDone());
 
 555         assertSame(outcome, result.get());
 
 557         // third task completes, others do not
 
 559         tasks.add(() -> new CompletableFuture<>());
 
 560         tasks.add(() -> new CompletableFuture<>());
 
 561         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 563         result = oper.anyOf(tasks);
 
 564         assertTrue(executor.runAll(MAX_REQUESTS));
 
 565         assertTrue(result.isDone());
 
 566         assertSame(outcome, result.get());
 
 570      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
 
 573     @SuppressWarnings("unchecked")
 
 574     public void testAnyOfEdge() throws Exception {
 
 575         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 577         // zero items: check both using a list and using an array
 
 578         assertNull(oper.anyOf(tasks));
 
 579         assertNull(oper.anyOf());
 
 581         // one item: : check both using a list and using an array
 
 582         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 583         tasks.add(() -> future1);
 
 585         assertSame(future1, oper.anyOf(tasks));
 
 586         assertSame(future1, oper.anyOf(() -> future1));
 
 590     public void testAllOfArray() throws Exception {
 
 591         final OperationOutcome outcome = params.makeOutcome();
 
 593         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 594         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 595         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 597         @SuppressWarnings("unchecked")
 
 598         CompletableFuture<OperationOutcome> result =
 
 599                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
 
 601         assertTrue(executor.runAll(MAX_REQUESTS));
 
 602         assertFalse(result.isDone());
 
 603         future1.complete(outcome);
 
 605         // complete 3 before 2
 
 606         assertTrue(executor.runAll(MAX_REQUESTS));
 
 607         assertFalse(result.isDone());
 
 608         future3.complete(outcome);
 
 610         assertTrue(executor.runAll(MAX_REQUESTS));
 
 611         assertFalse(result.isDone());
 
 612         future2.complete(outcome);
 
 614         // all of them are now done
 
 615         assertTrue(executor.runAll(MAX_REQUESTS));
 
 616         assertTrue(result.isDone());
 
 617         assertSame(outcome, result.get());
 
 621     public void testAllOfList() throws Exception {
 
 622         final OperationOutcome outcome = params.makeOutcome();
 
 624         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 625         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 626         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 628         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 629         tasks.add(() -> future1);
 
 630         tasks.add(() -> future2);
 
 631         tasks.add(() -> null);
 
 632         tasks.add(() -> future3);
 
 634         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 636         assertTrue(executor.runAll(MAX_REQUESTS));
 
 637         assertFalse(result.isDone());
 
 638         future1.complete(outcome);
 
 640         // complete 3 before 2
 
 641         assertTrue(executor.runAll(MAX_REQUESTS));
 
 642         assertFalse(result.isDone());
 
 643         future3.complete(outcome);
 
 645         assertTrue(executor.runAll(MAX_REQUESTS));
 
 646         assertFalse(result.isDone());
 
 647         future2.complete(outcome);
 
 649         // all of them are now done
 
 650         assertTrue(executor.runAll(MAX_REQUESTS));
 
 651         assertTrue(result.isDone());
 
 652         assertSame(outcome, result.get());
 
 656      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
 
 659     @SuppressWarnings("unchecked")
 
 660     public void testAllOfEdge() throws Exception {
 
 661         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 663         // zero items: check both using a list and using an array
 
 664         assertNull(oper.allOf(tasks));
 
 665         assertNull(oper.allOf());
 
 667         // one item: : check both using a list and using an array
 
 668         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 669         tasks.add(() -> future1);
 
 671         assertSame(future1, oper.allOf(tasks));
 
 672         assertSame(future1, oper.allOf(() -> future1));
 
 676     public void testAttachFutures() throws Exception {
 
 677         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 679         // third task throws an exception during construction
 
 680         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 681         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 682         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 683         tasks.add(() -> future1);
 
 684         tasks.add(() -> future2);
 
 686             throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 688         tasks.add(() -> future3);
 
 690         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
 
 692         // should have canceled the first two, but not the last
 
 693         assertTrue(future1.isCancelled());
 
 694         assertTrue(future2.isCancelled());
 
 695         assertFalse(future3.isCancelled());
 
 699     public void testCombineOutcomes() throws Exception {
 
 701         verifyOutcomes(0, OperationResult.SUCCESS);
 
 702         verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
 
 704         // maximum is in different positions
 
 705         verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
 
 706         verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
 
 707         verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
 
 709         // null outcome - takes precedence over a success
 
 710         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 711         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 712         tasks.add(() -> CompletableFuture.completedFuture(null));
 
 713         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 714         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 716         assertTrue(executor.runAll(MAX_REQUESTS));
 
 717         assertTrue(result.isDone());
 
 718         assertNull(result.get());
 
 720         // one throws an exception during execution
 
 721         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
 724         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 725         tasks.add(() -> CompletableFuture.failedFuture(except));
 
 726         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 727         result = oper.allOf(tasks);
 
 729         assertTrue(executor.runAll(MAX_REQUESTS));
 
 730         assertTrue(result.isCompletedExceptionally());
 
 731         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
 
 735      * Tests both flavors of sequence(), because one invokes the other.
 
 738     public void testSequence() throws Exception {
 
 739         final OperationOutcome outcome = params.makeOutcome();
 
 741         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 742         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 743         tasks.add(() -> null);
 
 744         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 745         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 747         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
 
 748         assertTrue(executor.runAll(MAX_REQUESTS));
 
 749         assertTrue(result.isDone());
 
 750         assertSame(outcome, result.get());
 
 752         // repeat using array form
 
 753         @SuppressWarnings("unchecked")
 
 754         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 755         result = oper.sequence(tasks.toArray(taskArray));
 
 756         assertTrue(executor.runAll(MAX_REQUESTS));
 
 757         assertTrue(result.isDone());
 
 758         assertSame(outcome, result.get());
 
 760         // second task fails, third should not run
 
 761         OperationOutcome failure = params.makeOutcome();
 
 762         failure.setResult(OperationResult.FAILURE);
 
 764         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 765         tasks.add(() -> CompletableFuture.completedFuture(failure));
 
 766         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 768         result = oper.sequence(tasks);
 
 769         assertTrue(executor.runAll(MAX_REQUESTS));
 
 770         assertTrue(result.isDone());
 
 771         assertSame(failure, result.get());
 
 775      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
 
 778     @SuppressWarnings("unchecked")
 
 779     public void testSequenceEdge() throws Exception {
 
 780         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 782         // zero items: check both using a list and using an array
 
 783         assertNull(oper.sequence(tasks));
 
 784         assertNull(oper.sequence());
 
 786         // one item: : check both using a list and using an array
 
 787         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 788         tasks.add(() -> future1);
 
 790         assertSame(future1, oper.sequence(tasks));
 
 791         assertSame(future1, oper.sequence(() -> future1));
 
 794     private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
 
 795         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 797         OperationOutcome expectedOutcome = null;
 
 799         for (int count = 0; count < results.length; ++count) {
 
 800             OperationOutcome outcome = params.makeOutcome();
 
 801             outcome.setResult(results[count]);
 
 802             tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 804             if (count == expected) {
 
 805                 expectedOutcome = outcome;
 
 809         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 811         assertTrue(executor.runAll(MAX_REQUESTS));
 
 812         assertTrue(result.isDone());
 
 813         assertSame(expectedOutcome, result.get());
 
 817     public void testDetmPriority() throws CoderException {
 
 818         assertEquals(1, oper.detmPriority(null));
 
 820         OperationOutcome outcome = params.makeOutcome();
 
 822         Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
 
 823                 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
 
 824                 OperationResult.FAILURE_EXCEPTION, 6);
 
 826         for (Entry<OperationResult, Integer> ent : map.entrySet()) {
 
 827             outcome.setResult(ent.getKey());
 
 828             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
 
 832          * Test null result. We can't actually set it to null, because the set() method
 
 833          * won't allow it. Instead, we decode it from a structure.
 
 835         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
 
 836         assertEquals(1, oper.detmPriority(outcome));
 
 840      * Tests callbackStarted() when the pipeline has already been stopped.
 
 843     public void testCallbackStartedNotRunning() {
 
 844         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
 847          * arrange to stop the controller when the start-callback is invoked, but capture
 
 850         params = params.toBuilder().startCallback(oper -> {
 
 852             future.get().cancel(false);
 
 855         // new params, thus need a new operation
 
 858         future.set(oper.start());
 
 859         assertTrue(executor.runAll(MAX_REQUESTS));
 
 861         // should have only run once
 
 862         assertEquals(1, numStart);
 
 866      * Tests callbackCompleted() when the pipeline has already been stopped.
 
 869     public void testCallbackCompletedNotRunning() {
 
 870         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
 872         // arrange to stop the controller when the start-callback is invoked
 
 873         params = params.toBuilder().startCallback(oper -> {
 
 874             future.get().cancel(false);
 
 877         // new params, thus need a new operation
 
 880         future.set(oper.start());
 
 881         assertTrue(executor.runAll(MAX_REQUESTS));
 
 883         // should not have been set
 
 885         assertEquals(0, numEnd);
 
 889     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
 
 890         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
 
 892         OperationOutcome outcome;
 
 894         outcome = new OperationOutcome();
 
 895         oper.setOutcome(outcome, timex);
 
 896         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
 897         assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
 
 899         outcome = new OperationOutcome();
 
 900         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
 
 901         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
 902         assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
 
 906     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
 
 907         OperationOutcome outcome;
 
 909         outcome = new OperationOutcome();
 
 910         oper.setOutcome(outcome, OperationResult.SUCCESS);
 
 911         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
 912         assertEquals(OperationResult.SUCCESS, outcome.getResult());
 
 914         oper.setOutcome(outcome, OperationResult.SUCCESS);
 
 915         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
 916         assertEquals(OperationResult.SUCCESS, outcome.getResult());
 
 918         for (OperationResult result : FAILURE_RESULTS) {
 
 919             outcome = new OperationOutcome();
 
 920             oper.setOutcome(outcome, result);
 
 921             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
 922             assertEquals(result.toString(), result, outcome.getResult());
 
 927     public void testMakeOutcome() {
 
 928         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
 
 929         assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
 
 933     public void testIsTimeout() {
 
 934         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
 
 936         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
 
 937         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
 
 938         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
 
 939         assertFalse(oper.isTimeout(new CompletionException(null)));
 
 940         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
 
 942         assertTrue(oper.isTimeout(timex));
 
 943         assertTrue(oper.isTimeout(new CompletionException(timex)));
 
 947     public void testLogMessage() {
 
 948         final String infraStr = SINK_INFRA.toString();
 
 950         // log structured data
 
 951         appender.clearExtractions();
 
 952         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
 953         List<String> output = appender.getExtracted();
 
 954         assertEquals(1, output.size());
 
 956         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
 
 957                         .contains("{\n  \"text\": \"my-text\"\n}");
 
 959         // repeat with a response
 
 960         appender.clearExtractions();
 
 961         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
 962         output = appender.getExtracted();
 
 963         assertEquals(1, output.size());
 
 965         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
 
 966                         .contains("{\n  \"text\": \"my-text\"\n}");
 
 968         // log a plain string
 
 969         appender.clearExtractions();
 
 970         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
 
 971         output = appender.getExtracted();
 
 972         assertEquals(1, output.size());
 
 973         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
 
 975         // log a null request
 
 976         appender.clearExtractions();
 
 977         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
 
 978         output = appender.getExtracted();
 
 979         assertEquals(1, output.size());
 
 981         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
 
 983         // generate exception from coder
 
 984         setOperCoderException();
 
 986         appender.clearExtractions();
 
 987         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
 988         output = appender.getExtracted();
 
 989         assertEquals(2, output.size());
 
 990         assertThat(output.get(0)).contains("cannot pretty-print request");
 
 991         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
 
 993         // repeat with a response
 
 994         appender.clearExtractions();
 
 995         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
 996         output = appender.getExtracted();
 
 997         assertEquals(2, output.size());
 
 998         assertThat(output.get(0)).contains("cannot pretty-print response");
 
 999         assertThat(output.get(1)).contains(MY_SOURCE);
 
1003     public void testGetRetry() {
 
1004         assertEquals(0, oper.getRetry(null));
 
1005         assertEquals(10, oper.getRetry(10));
 
1009     public void testGetRetryWait() {
 
1010         // need an operator that doesn't override the retry time
 
1011         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
1012         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
 
1016     public void testGetTimeOutMs() {
 
1017         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
 
1019         params = params.toBuilder().timeoutSec(null).build();
 
1021         // new params, thus need a new operation
 
1022         oper = new MyOper();
 
1024         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
 
1027     private void starter(OperationOutcome oper) {
 
1029         tstart = oper.getStart();
 
1034     private void completer(OperationOutcome oper) {
 
1041      * Gets a function that does nothing.
 
1043      * @param <T> type of input parameter expected by the function
 
1044      * @return a function that does nothing
 
1046     private <T> Consumer<T> noop() {
 
1051     private OperationOutcome makeSuccess() {
 
1052         OperationOutcome outcome = params.makeOutcome();
 
1053         outcome.setResult(OperationResult.SUCCESS);
 
1061      * @param testName test name
 
1062      * @param expectedCallbacks number of callbacks expected
 
1063      * @param expectedOperations number of operation invocations expected
 
1064      * @param expectedResult expected outcome
 
1066     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1067             OperationResult expectedResult) {
 
1069         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
 
1075      * @param testName test name
 
1076      * @param expectedCallbacks number of callbacks expected
 
1077      * @param expectedOperations number of operation invocations expected
 
1078      * @param expectedResult expected outcome
 
1079      * @param manipulator function to modify the future returned by
 
1080      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
 
1081      *        in the executor are run
 
1083     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1084             OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
 
1092         CompletableFuture<OperationOutcome> future = oper.start();
 
1094         manipulator.accept(future);
 
1096         assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
1098         assertEquals(testName, expectedCallbacks, numStart);
 
1099         assertEquals(testName, expectedCallbacks, numEnd);
 
1101         if (expectedCallbacks > 0) {
 
1102             assertNotNull(testName, opstart);
 
1103             assertNotNull(testName, opend);
 
1104             assertEquals(testName, expectedResult, opend.getResult());
 
1106             assertSame(testName, tstart, opstart.getStart());
 
1107             assertSame(testName, tstart, opend.getStart());
 
1110                 assertTrue(future.isDone());
 
1111                 assertEquals(testName, opend, future.get());
 
1113                 // "start" is never final
 
1114                 for (OperationOutcome outcome : starts) {
 
1115                     assertFalse(testName, outcome.isFinalOutcome());
 
1118                 // only the last "complete" is final
 
1119                 assertTrue(testName, ends.removeLast().isFinalOutcome());
 
1121                 for (OperationOutcome outcome : ends) {
 
1122                     assertFalse(outcome.isFinalOutcome());
 
1125             } catch (InterruptedException | ExecutionException e) {
 
1126                 throw new IllegalStateException(e);
 
1129             if (expectedOperations > 0) {
 
1130                 assertNotNull(testName, oper.getSubRequestId());
 
1131                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
 
1132                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
 
1136         assertEquals(testName, expectedOperations, oper.getCount());
 
1140      * Creates a new {@link #oper} whose coder will throw an exception.
 
1142     private void setOperCoderException() {
 
1143         oper = new MyOper() {
 
1145             protected Coder getCoder() {
 
1146                 return new StandardCoder() {
 
1148                     public String encode(Object object, boolean pretty) throws CoderException {
 
1149                         throw new CoderException(EXPECTED_EXCEPTION);
 
1158     public static class MyData {
 
1159         private String text = TEXT;
 
1163     private class MyOper extends OperationPartial {
 
1165         private int count = 0;
 
1168         private boolean genException;
 
1170         private int maxFailures = 0;
 
1172         private CompletableFuture<OperationOutcome> preProc;
 
1176             super(OperationPartialTest.this.params, config, PROP_NAMES);
 
1180         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
1183                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1186             operation.setSubRequestId(String.valueOf(attempt));
 
1188             if (count > maxFailures) {
 
1189                 operation.setResult(OperationResult.SUCCESS);
 
1191                 operation.setResult(OperationResult.FAILURE);
 
1198         protected long getRetryWaitMs() {
 
1200              * Sleep timers run in the background, but we want to control things via the
 
1201              * "executor", thus we avoid sleep timers altogether by simply returning 0.