2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertFalse;
 
  27 import static org.junit.Assert.assertNotNull;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertSame;
 
  30 import static org.junit.Assert.assertTrue;
 
  31 import static org.mockito.ArgumentMatchers.any;
 
  32 import static org.mockito.Mockito.verify;
 
  33 import static org.mockito.Mockito.when;
 
  35 import ch.qos.logback.classic.Logger;
 
  36 import java.time.Instant;
 
  37 import java.util.ArrayDeque;
 
  38 import java.util.Arrays;
 
  39 import java.util.Collections;
 
  40 import java.util.Deque;
 
  41 import java.util.LinkedList;
 
  42 import java.util.List;
 
  44 import java.util.Map.Entry;
 
  45 import java.util.UUID;
 
  46 import java.util.concurrent.CompletableFuture;
 
  47 import java.util.concurrent.CompletionException;
 
  48 import java.util.concurrent.ExecutionException;
 
  49 import java.util.concurrent.ForkJoinPool;
 
  50 import java.util.concurrent.Future;
 
  51 import java.util.concurrent.TimeUnit;
 
  52 import java.util.concurrent.TimeoutException;
 
  53 import java.util.concurrent.atomic.AtomicReference;
 
  54 import java.util.function.Consumer;
 
  55 import java.util.function.Supplier;
 
  56 import java.util.stream.Collectors;
 
  59 import org.junit.AfterClass;
 
  60 import org.junit.Before;
 
  61 import org.junit.BeforeClass;
 
  62 import org.junit.Test;
 
  63 import org.mockito.ArgumentCaptor;
 
  64 import org.mockito.Mock;
 
  65 import org.mockito.MockitoAnnotations;
 
  66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  68 import org.onap.policy.common.utils.coder.Coder;
 
  69 import org.onap.policy.common.utils.coder.CoderException;
 
  70 import org.onap.policy.common.utils.coder.StandardCoder;
 
  71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 
  72 import org.onap.policy.common.utils.time.PseudoExecutor;
 
  73 import org.onap.policy.controlloop.ControlLoopOperation;
 
  74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 
  75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 
  76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 
  79 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 
  80 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  81 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
 
  82 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 
  83 import org.onap.policy.controlloop.policy.PolicyResult;
 
  84 import org.slf4j.LoggerFactory;
 
  86 public class OperationPartialTest {
 
  87     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
 
  88     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
 
  89     private static final int MAX_REQUESTS = 100;
 
  90     private static final int MAX_PARALLEL = 10;
 
  91     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  92     private static final String ACTOR = "my-actor";
 
  93     private static final String OPERATION = "my-operation";
 
  94     private static final String MY_SINK = "my-sink";
 
  95     private static final String MY_SOURCE = "my-source";
 
  96     private static final String MY_TARGET_ENTITY = "my-entity";
 
  97     private static final String TEXT = "my-text";
 
  98     private static final int TIMEOUT = 1000;
 
  99     private static final UUID REQ_ID = UUID.randomUUID();
 
 101     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
 
 102                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
 105      * Used to attach an appender to the class' logger.
 
 107     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
 
 108     private static final ExtractAppender appender = new ExtractAppender();
 
 110     private static final List<String> PROP_NAMES = List.of("hello", "world");
 
 113     private ActorService service;
 
 115     private Actor guardActor;
 
 117     private Operator guardOperator;
 
 119     private Operation guardOperation;
 
 121     private VirtualControlLoopEvent event;
 
 122     private ControlLoopEventContext context;
 
 123     private PseudoExecutor executor;
 
 124     private ControlLoopOperationParams params;
 
 128     private int numStart;
 
 131     private Instant tstart;
 
 133     private OperationOutcome opstart;
 
 134     private OperationOutcome opend;
 
 136     private Deque<OperationOutcome> starts;
 
 137     private Deque<OperationOutcome> ends;
 
 139     private OperatorConfig config;
 
 142      * Attaches the appender to the logger.
 
 145     public static void setUpBeforeClass() throws Exception {
 
 147          * Attach appender to the logger.
 
 149         appender.setContext(logger.getLoggerContext());
 
 152         logger.addAppender(appender);
 
 156      * Stops the appender.
 
 159     public static void tearDownAfterClass() {
 
 164      * Initializes the fields, including {@link #oper}.
 
 167     public void setUp() {
 
 168         MockitoAnnotations.initMocks(this);
 
 170         event = new VirtualControlLoopEvent();
 
 171         event.setRequestId(REQ_ID);
 
 173         context = new ControlLoopEventContext(event);
 
 174         executor = new PseudoExecutor();
 
 176         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
 
 177                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
 
 178                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
 
 180         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
 
 181         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
 
 182         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
 
 183         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
 
 185         config = new OperatorConfig(executor);
 
 194         starts = new ArrayDeque<>(10);
 
 195         ends = new ArrayDeque<>(10);
 
 199     public void testOperatorPartial_testGetActorName_testGetName() {
 
 200         assertEquals(ACTOR, oper.getActorName());
 
 201         assertEquals(OPERATION, oper.getName());
 
 202         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
 
 206     public void testGetBlockingThread() throws Exception {
 
 207         CompletableFuture<Void> future = new CompletableFuture<>();
 
 209         // use the real executor
 
 210         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
 
 212             public Operation buildOperation(ControlLoopOperationParams params) {
 
 217         oper2.getBlockingExecutor().execute(() -> future.complete(null));
 
 219         assertNull(future.get(5, TimeUnit.SECONDS));
 
 223     public void testGetPropertyNames() {
 
 224         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
 
 228     public void testGetProperty_testSetProperty() {
 
 229         oper.setProperty("propertyA", "valueA");
 
 230         oper.setProperty("propertyB", "valueB");
 
 231         oper.setProperty("propertyC", 20);
 
 233         assertEquals("valueA", oper.getProperty("propertyA"));
 
 234         assertEquals("valueB", oper.getProperty("propertyB"));
 
 235         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
 
 239     public void testStart() {
 
 240         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
 
 244      * Tests start() with multiple running requests.
 
 247     public void testStartMultiple() {
 
 248         for (int count = 0; count < MAX_PARALLEL; ++count) {
 
 252         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
 254         assertNotNull(opstart);
 
 255         assertNotNull(opend);
 
 256         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
 258         assertEquals(MAX_PARALLEL, numStart);
 
 259         assertEquals(MAX_PARALLEL, oper.getCount());
 
 260         assertEquals(MAX_PARALLEL, numEnd);
 
 264      * Tests startPreprocessor() when the preprocessor returns a failure.
 
 267     public void testStartPreprocessorFailure() {
 
 268         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 270         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
 
 274      * Tests startPreprocessor() when the preprocessor throws an exception.
 
 277     public void testStartPreprocessorException() {
 
 278         // arrange for the preprocessor to throw an exception
 
 279         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
 
 281         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
 
 285      * Tests startPreprocessor() when the pipeline is not running.
 
 288     public void testStartPreprocessorNotRunning() {
 
 289         // arrange for the preprocessor to return success, which will be ignored
 
 290         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
 292         oper.start().cancel(false);
 
 293         assertTrue(executor.runAll(MAX_REQUESTS));
 
 298         assertEquals(0, numStart);
 
 299         assertEquals(0, oper.getCount());
 
 300         assertEquals(0, numEnd);
 
 304      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
 
 307     public void testStartPreprocessorBuilderException() {
 
 308         oper = new MyOper() {
 
 310             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
 311                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 315         assertThatIllegalStateException().isThrownBy(() -> oper.start());
 
 317         // should be nothing in the queue
 
 318         assertEquals(0, executor.getQueueLength());
 
 322     public void testStartPreprocessorAsync() {
 
 323         assertNull(oper.startPreprocessorAsync());
 
 327     public void testStartGuardAsync() throws Exception {
 
 328         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
 
 329         assertTrue(future.isDone());
 
 330         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
 
 332         // verify the parameters that were passed
 
 333         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
 
 334                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
 
 335         verify(guardOperator).buildOperation(paramsCaptor.capture());
 
 337         params = paramsCaptor.getValue();
 
 338         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
 
 339         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
 
 340         assertNull(params.getRetry());
 
 341         assertNull(params.getTimeoutSec());
 
 343         Map<String, Object> payload = params.getPayload();
 
 344         assertNotNull(payload);
 
 346         assertEquals(oper.makeGuardPayload(), payload);
 
 350      * Tests startGuardAsync() when preprocessing is disabled.
 
 353     public void testStartGuardAsyncDisabled() {
 
 354         params = params.toBuilder().preprocessed(true).build();
 
 355         assertNull(new MyOper().startGuardAsync());
 
 359     public void testMakeGuardPayload() {
 
 360         Map<String, Object> payload = oper.makeGuardPayload();
 
 361         assertSame(REQ_ID, payload.get("requestId"));
 
 363         // request id changes, so remove it
 
 364         payload.remove("requestId");
 
 366         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
 
 368         // repeat, but with closed loop name
 
 369         event.setClosedLoopControlName("my-loop");
 
 370         payload = oper.makeGuardPayload();
 
 371         payload.remove("requestId");
 
 372         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
 
 376     public void testStartOperationAsync() {
 
 378         assertTrue(executor.runAll(MAX_REQUESTS));
 
 380         assertEquals(1, oper.getCount());
 
 384     public void testIsSuccess() {
 
 385         assertFalse(oper.isSuccess(null));
 
 387         OperationOutcome outcome = new OperationOutcome();
 
 389         outcome.setResult(PolicyResult.SUCCESS);
 
 390         assertTrue(oper.isSuccess(outcome));
 
 392         for (PolicyResult failure : FAILURE_RESULTS) {
 
 393             outcome.setResult(failure);
 
 394             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
 
 399     public void testIsActorFailed() {
 
 400         assertFalse(oper.isActorFailed(null));
 
 402         OperationOutcome outcome = params.makeOutcome();
 
 405         outcome.setResult(PolicyResult.SUCCESS);
 
 406         assertFalse(oper.isActorFailed(outcome));
 
 408         outcome.setResult(PolicyResult.FAILURE_RETRIES);
 
 409         assertFalse(oper.isActorFailed(outcome));
 
 412         outcome.setResult(PolicyResult.FAILURE);
 
 415         outcome.setActor(MY_SINK);
 
 416         assertFalse(oper.isActorFailed(outcome));
 
 417         outcome.setActor(null);
 
 418         assertFalse(oper.isActorFailed(outcome));
 
 419         outcome.setActor(ACTOR);
 
 421         // incorrect operation
 
 422         outcome.setOperation(MY_SINK);
 
 423         assertFalse(oper.isActorFailed(outcome));
 
 424         outcome.setOperation(null);
 
 425         assertFalse(oper.isActorFailed(outcome));
 
 426         outcome.setOperation(OPERATION);
 
 429         assertTrue(oper.isActorFailed(outcome));
 
 433     public void testDoOperation() {
 
 435          * Use an operation that doesn't override doOperation().
 
 437         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
 440         assertTrue(executor.runAll(MAX_REQUESTS));
 
 442         assertNotNull(opend);
 
 443         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
 
 447     public void testTimeout() throws Exception {
 
 449         // use a real executor
 
 450         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 452         // trigger timeout very quickly
 
 453         oper = new MyOper() {
 
 455             protected long getTimeoutMs(Integer timeoutSec) {
 
 460             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 462                 OperationOutcome outcome2 = params.makeOutcome();
 
 463                 outcome2.setResult(PolicyResult.SUCCESS);
 
 466                  * Create an incomplete future that will timeout after the operation's
 
 467                  * timeout. If it fires before the other timer, then it will return a
 
 470                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 471                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
 
 472                                 params.getExecutor());
 
 478         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
 
 482      * Tests retry functions, when the count is set to zero and retries are exhausted.
 
 485     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
 
 486         params = params.toBuilder().retry(0).build();
 
 488         // new params, thus need a new operation
 
 491         oper.setMaxFailures(10);
 
 493         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
 
 497      * Tests retry functions, when the count is null and retries are exhausted.
 
 500     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
 
 501         params = params.toBuilder().retry(null).build();
 
 503         // new params, thus need a new operation
 
 506         oper.setMaxFailures(10);
 
 508         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
 
 512      * Tests retry functions, when retries are exhausted.
 
 515     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
 
 516         final int maxRetries = 3;
 
 517         params = params.toBuilder().retry(maxRetries).build();
 
 519         // new params, thus need a new operation
 
 522         oper.setMaxFailures(10);
 
 524         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
 
 525                         PolicyResult.FAILURE_RETRIES);
 
 529      * Tests retry functions, when a success follows some retries.
 
 532     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
 
 533         params = params.toBuilder().retry(10).build();
 
 535         // new params, thus need a new operation
 
 538         final int maxFailures = 3;
 
 539         oper.setMaxFailures(maxFailures);
 
 541         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
 
 542                         PolicyResult.SUCCESS);
 
 546      * Tests retry functions, when the outcome is {@code null}.
 
 549     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
 
 551         // arrange to return null from doOperation()
 
 552         oper = new MyOper() {
 
 554             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
 
 557                 super.doOperation(attempt, outcome);
 
 562         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
 
 566     public void testSleep() throws Exception {
 
 567         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
 
 568         assertTrue(future.isDone());
 
 569         assertNull(future.get());
 
 572         future = oper.sleep(0, TimeUnit.SECONDS);
 
 573         assertTrue(future.isDone());
 
 574         assertNull(future.get());
 
 577          * Start a second sleep we can use to check the first while it's running.
 
 579         tstart = Instant.now();
 
 580         future = oper.sleep(100, TimeUnit.MILLISECONDS);
 
 582         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
 
 584         // wait for second to complete and verify that the first has not completed
 
 586         assertFalse(future.isDone());
 
 588         // wait for second to complete
 
 591         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
 
 592         assertTrue(diff >= 99);
 
 596     public void testIsSameOperation() {
 
 597         assertFalse(oper.isSameOperation(null));
 
 599         OperationOutcome outcome = params.makeOutcome();
 
 601         // wrong actor - should be false
 
 602         outcome.setActor(null);
 
 603         assertFalse(oper.isSameOperation(outcome));
 
 604         outcome.setActor(MY_SINK);
 
 605         assertFalse(oper.isSameOperation(outcome));
 
 606         outcome.setActor(ACTOR);
 
 608         // wrong operation - should be null
 
 609         outcome.setOperation(null);
 
 610         assertFalse(oper.isSameOperation(outcome));
 
 611         outcome.setOperation(MY_SINK);
 
 612         assertFalse(oper.isSameOperation(outcome));
 
 613         outcome.setOperation(OPERATION);
 
 615         assertTrue(oper.isSameOperation(outcome));
 
 619      * Tests handleFailure() when the outcome is a success.
 
 622     public void testHandlePreprocessorFailureSuccess() {
 
 623         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
 
 624         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
 
 628      * Tests handleFailure() when the outcome is <i>not</i> a success.
 
 631     public void testHandlePreprocessorFailureFailed() throws Exception {
 
 632         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 633         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
 
 637      * Tests handleFailure() when the outcome is {@code null}.
 
 640     public void testHandlePreprocessorFailureNull() throws Exception {
 
 641         // arrange to return a null outcome from the preprocessor
 
 642         oper.setPreProc(CompletableFuture.completedFuture(null));
 
 643         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
 
 647     public void testFromException() {
 
 648         // arrange to generate an exception when operation runs
 
 649         oper.setGenException(true);
 
 651         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
 
 655      * Tests fromException() when there is no exception.
 
 658     public void testFromExceptionNoExcept() {
 
 659         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
 
 663      * Tests both flavors of anyOf(), because one invokes the other.
 
 666     public void testAnyOf() throws Exception {
 
 667         // first task completes, others do not
 
 668         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 670         final OperationOutcome outcome = params.makeOutcome();
 
 672         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 673         tasks.add(() -> new CompletableFuture<>());
 
 674         tasks.add(() -> null);
 
 675         tasks.add(() -> new CompletableFuture<>());
 
 677         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
 
 678         assertTrue(executor.runAll(MAX_REQUESTS));
 
 679         assertTrue(result.isDone());
 
 680         assertSame(outcome, result.get());
 
 682         // repeat using array form
 
 683         @SuppressWarnings("unchecked")
 
 684         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 685         result = oper.anyOf(tasks.toArray(taskArray));
 
 686         assertTrue(executor.runAll(MAX_REQUESTS));
 
 687         assertTrue(result.isDone());
 
 688         assertSame(outcome, result.get());
 
 690         // second task completes, others do not
 
 692         tasks.add(() -> new CompletableFuture<>());
 
 693         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 694         tasks.add(() -> new CompletableFuture<>());
 
 696         result = oper.anyOf(tasks);
 
 697         assertTrue(executor.runAll(MAX_REQUESTS));
 
 698         assertTrue(result.isDone());
 
 699         assertSame(outcome, result.get());
 
 701         // third task completes, others do not
 
 703         tasks.add(() -> new CompletableFuture<>());
 
 704         tasks.add(() -> new CompletableFuture<>());
 
 705         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 707         result = oper.anyOf(tasks);
 
 708         assertTrue(executor.runAll(MAX_REQUESTS));
 
 709         assertTrue(result.isDone());
 
 710         assertSame(outcome, result.get());
 
 714      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
 
 717     @SuppressWarnings("unchecked")
 
 718     public void testAnyOfEdge() throws Exception {
 
 719         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 721         // zero items: check both using a list and using an array
 
 722         assertNull(oper.anyOf(tasks));
 
 723         assertNull(oper.anyOf());
 
 725         // one item: : check both using a list and using an array
 
 726         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 727         tasks.add(() -> future1);
 
 729         assertSame(future1, oper.anyOf(tasks));
 
 730         assertSame(future1, oper.anyOf(() -> future1));
 
 734     public void testAllOfArray() throws Exception {
 
 735         final OperationOutcome outcome = params.makeOutcome();
 
 737         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 738         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 739         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 741         @SuppressWarnings("unchecked")
 
 742         CompletableFuture<OperationOutcome> result =
 
 743                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
 
 745         assertTrue(executor.runAll(MAX_REQUESTS));
 
 746         assertFalse(result.isDone());
 
 747         future1.complete(outcome);
 
 749         // complete 3 before 2
 
 750         assertTrue(executor.runAll(MAX_REQUESTS));
 
 751         assertFalse(result.isDone());
 
 752         future3.complete(outcome);
 
 754         assertTrue(executor.runAll(MAX_REQUESTS));
 
 755         assertFalse(result.isDone());
 
 756         future2.complete(outcome);
 
 758         // all of them are now done
 
 759         assertTrue(executor.runAll(MAX_REQUESTS));
 
 760         assertTrue(result.isDone());
 
 761         assertSame(outcome, result.get());
 
 765     public void testAllOfList() throws Exception {
 
 766         final OperationOutcome outcome = params.makeOutcome();
 
 768         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 769         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 770         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 772         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 773         tasks.add(() -> future1);
 
 774         tasks.add(() -> future2);
 
 775         tasks.add(() -> null);
 
 776         tasks.add(() -> future3);
 
 778         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 780         assertTrue(executor.runAll(MAX_REQUESTS));
 
 781         assertFalse(result.isDone());
 
 782         future1.complete(outcome);
 
 784         // complete 3 before 2
 
 785         assertTrue(executor.runAll(MAX_REQUESTS));
 
 786         assertFalse(result.isDone());
 
 787         future3.complete(outcome);
 
 789         assertTrue(executor.runAll(MAX_REQUESTS));
 
 790         assertFalse(result.isDone());
 
 791         future2.complete(outcome);
 
 793         // all of them are now done
 
 794         assertTrue(executor.runAll(MAX_REQUESTS));
 
 795         assertTrue(result.isDone());
 
 796         assertSame(outcome, result.get());
 
 800      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
 
 803     @SuppressWarnings("unchecked")
 
 804     public void testAllOfEdge() throws Exception {
 
 805         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 807         // zero items: check both using a list and using an array
 
 808         assertNull(oper.allOf(tasks));
 
 809         assertNull(oper.allOf());
 
 811         // one item: : check both using a list and using an array
 
 812         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 813         tasks.add(() -> future1);
 
 815         assertSame(future1, oper.allOf(tasks));
 
 816         assertSame(future1, oper.allOf(() -> future1));
 
 820     public void testAttachFutures() throws Exception {
 
 821         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 823         // third task throws an exception during construction
 
 824         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 825         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 826         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 827         tasks.add(() -> future1);
 
 828         tasks.add(() -> future2);
 
 830             throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 832         tasks.add(() -> future3);
 
 834         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
 
 836         // should have canceled the first two, but not the last
 
 837         assertTrue(future1.isCancelled());
 
 838         assertTrue(future2.isCancelled());
 
 839         assertFalse(future3.isCancelled());
 
 843     public void testCombineOutcomes() throws Exception {
 
 845         verifyOutcomes(0, PolicyResult.SUCCESS);
 
 846         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
 
 848         // maximum is in different positions
 
 849         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
 
 850         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
 
 851         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
 853         // null outcome - takes precedence over a success
 
 854         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 855         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 856         tasks.add(() -> CompletableFuture.completedFuture(null));
 
 857         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 858         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 860         assertTrue(executor.runAll(MAX_REQUESTS));
 
 861         assertTrue(result.isDone());
 
 862         assertNull(result.get());
 
 864         // one throws an exception during execution
 
 865         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
 868         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 869         tasks.add(() -> CompletableFuture.failedFuture(except));
 
 870         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 871         result = oper.allOf(tasks);
 
 873         assertTrue(executor.runAll(MAX_REQUESTS));
 
 874         assertTrue(result.isCompletedExceptionally());
 
 875         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
 
 879      * Tests both flavors of sequence(), because one invokes the other.
 
 882     public void testSequence() throws Exception {
 
 883         final OperationOutcome outcome = params.makeOutcome();
 
 885         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 886         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 887         tasks.add(() -> null);
 
 888         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 889         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 891         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
 
 892         assertTrue(executor.runAll(MAX_REQUESTS));
 
 893         assertTrue(result.isDone());
 
 894         assertSame(outcome, result.get());
 
 896         // repeat using array form
 
 897         @SuppressWarnings("unchecked")
 
 898         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 899         result = oper.sequence(tasks.toArray(taskArray));
 
 900         assertTrue(executor.runAll(MAX_REQUESTS));
 
 901         assertTrue(result.isDone());
 
 902         assertSame(outcome, result.get());
 
 904         // second task fails, third should not run
 
 905         OperationOutcome failure = params.makeOutcome();
 
 906         failure.setResult(PolicyResult.FAILURE);
 
 908         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 909         tasks.add(() -> CompletableFuture.completedFuture(failure));
 
 910         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 912         result = oper.sequence(tasks);
 
 913         assertTrue(executor.runAll(MAX_REQUESTS));
 
 914         assertTrue(result.isDone());
 
 915         assertSame(failure, result.get());
 
 919      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
 
 922     @SuppressWarnings("unchecked")
 
 923     public void testSequenceEdge() throws Exception {
 
 924         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 926         // zero items: check both using a list and using an array
 
 927         assertNull(oper.sequence(tasks));
 
 928         assertNull(oper.sequence());
 
 930         // one item: : check both using a list and using an array
 
 931         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 932         tasks.add(() -> future1);
 
 934         assertSame(future1, oper.sequence(tasks));
 
 935         assertSame(future1, oper.sequence(() -> future1));
 
 938     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
 
 939         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 941         OperationOutcome expectedOutcome = null;
 
 943         for (int count = 0; count < results.length; ++count) {
 
 944             OperationOutcome outcome = params.makeOutcome();
 
 945             outcome.setResult(results[count]);
 
 946             tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 948             if (count == expected) {
 
 949                 expectedOutcome = outcome;
 
 953         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 955         assertTrue(executor.runAll(MAX_REQUESTS));
 
 956         assertTrue(result.isDone());
 
 957         assertSame(expectedOutcome, result.get());
 
 961     public void testDetmPriority() throws CoderException {
 
 962         assertEquals(1, oper.detmPriority(null));
 
 964         OperationOutcome outcome = params.makeOutcome();
 
 966         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
 
 967                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
 
 968                         PolicyResult.FAILURE_EXCEPTION, 6);
 
 970         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
 
 971             outcome.setResult(ent.getKey());
 
 972             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
 
 976          * Test null result. We can't actually set it to null, because the set() method
 
 977          * won't allow it. Instead, we decode it from a structure.
 
 979         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
 
 980         assertEquals(1, oper.detmPriority(outcome));
 
 984      * Tests callbackStarted() when the pipeline has already been stopped.
 
 987     public void testCallbackStartedNotRunning() {
 
 988         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
 991          * arrange to stop the controller when the start-callback is invoked, but capture
 
 994         params = params.toBuilder().startCallback(oper -> {
 
 996             future.get().cancel(false);
 
 999         // new params, thus need a new operation
 
1000         oper = new MyOper();
 
1002         future.set(oper.start());
 
1003         assertTrue(executor.runAll(MAX_REQUESTS));
 
1005         // should have only run once
 
1006         assertEquals(1, numStart);
 
1010      * Tests callbackCompleted() when the pipeline has already been stopped.
 
1013     public void testCallbackCompletedNotRunning() {
 
1014         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
1016         // arrange to stop the controller when the start-callback is invoked
 
1017         params = params.toBuilder().startCallback(oper -> {
 
1018             future.get().cancel(false);
 
1021         // new params, thus need a new operation
 
1022         oper = new MyOper();
 
1024         future.set(oper.start());
 
1025         assertTrue(executor.runAll(MAX_REQUESTS));
 
1027         // should not have been set
 
1029         assertEquals(0, numEnd);
 
1033     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
 
1034         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
 
1036         OperationOutcome outcome;
 
1038         outcome = new OperationOutcome();
 
1039         oper.setOutcome(outcome, timex);
 
1040         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1041         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
 
1043         outcome = new OperationOutcome();
 
1044         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
 
1045         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1046         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
 
1050     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
 
1051         OperationOutcome outcome;
 
1053         outcome = new OperationOutcome();
 
1054         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1055         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1056         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1058         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1059         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1060         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1062         for (PolicyResult result : FAILURE_RESULTS) {
 
1063             outcome = new OperationOutcome();
 
1064             oper.setOutcome(outcome, result);
 
1065             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1066             assertEquals(result.toString(), result, outcome.getResult());
 
1071     public void testIsTimeout() {
 
1072         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
 
1074         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
 
1075         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
 
1076         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
 
1077         assertFalse(oper.isTimeout(new CompletionException(null)));
 
1078         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
 
1080         assertTrue(oper.isTimeout(timex));
 
1081         assertTrue(oper.isTimeout(new CompletionException(timex)));
 
1085     public void testLogMessage() {
 
1086         final String infraStr = SINK_INFRA.toString();
 
1088         // log structured data
 
1089         appender.clearExtractions();
 
1090         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1091         List<String> output = appender.getExtracted();
 
1092         assertEquals(1, output.size());
 
1094         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
 
1095                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1097         // repeat with a response
 
1098         appender.clearExtractions();
 
1099         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1100         output = appender.getExtracted();
 
1101         assertEquals(1, output.size());
 
1103         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
 
1104                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1106         // log a plain string
 
1107         appender.clearExtractions();
 
1108         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
 
1109         output = appender.getExtracted();
 
1110         assertEquals(1, output.size());
 
1111         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
 
1113         // log a null request
 
1114         appender.clearExtractions();
 
1115         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
 
1116         output = appender.getExtracted();
 
1117         assertEquals(1, output.size());
 
1119         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
 
1121         // generate exception from coder
 
1122         setOperCoderException();
 
1124         appender.clearExtractions();
 
1125         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1126         output = appender.getExtracted();
 
1127         assertEquals(2, output.size());
 
1128         assertThat(output.get(0)).contains("cannot pretty-print request");
 
1129         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
 
1131         // repeat with a response
 
1132         appender.clearExtractions();
 
1133         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1134         output = appender.getExtracted();
 
1135         assertEquals(2, output.size());
 
1136         assertThat(output.get(0)).contains("cannot pretty-print response");
 
1137         assertThat(output.get(1)).contains(MY_SOURCE);
 
1141     public void testGetRetry() {
 
1142         assertEquals(0, oper.getRetry(null));
 
1143         assertEquals(10, oper.getRetry(10));
 
1147     public void testGetRetryWait() {
 
1148         // need an operator that doesn't override the retry time
 
1149         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
1150         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
 
1154     public void testGetTimeOutMs() {
 
1155         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
 
1157         params = params.toBuilder().timeoutSec(null).build();
 
1159         // new params, thus need a new operation
 
1160         oper = new MyOper();
 
1162         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
 
1165     private void starter(OperationOutcome oper) {
 
1167         tstart = oper.getStart();
 
1172     private void completer(OperationOutcome oper) {
 
1179      * Gets a function that does nothing.
 
1181      * @param <T> type of input parameter expected by the function
 
1182      * @return a function that does nothing
 
1184     private <T> Consumer<T> noop() {
 
1189     private OperationOutcome makeSuccess() {
 
1190         OperationOutcome outcome = params.makeOutcome();
 
1191         outcome.setResult(PolicyResult.SUCCESS);
 
1196     private OperationOutcome makeFailure() {
 
1197         OperationOutcome outcome = params.makeOutcome();
 
1198         outcome.setResult(PolicyResult.FAILURE);
 
1206      * @param testName test name
 
1207      * @param expectedCallbacks number of callbacks expected
 
1208      * @param expectedOperations number of operation invocations expected
 
1209      * @param expectedResult expected outcome
 
1211     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1212                     PolicyResult expectedResult) {
 
1214         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
 
1220      * @param testName test name
 
1221      * @param expectedCallbacks number of callbacks expected
 
1222      * @param expectedOperations number of operation invocations expected
 
1223      * @param expectedResult expected outcome
 
1224      * @param manipulator function to modify the future returned by
 
1225      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
 
1226      *        in the executor are run
 
1228     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
 
1229                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
 
1237         CompletableFuture<OperationOutcome> future = oper.start();
 
1239         manipulator.accept(future);
 
1241         assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
1243         assertEquals(testName, expectedCallbacks, numStart);
 
1244         assertEquals(testName, expectedCallbacks, numEnd);
 
1246         if (expectedCallbacks > 0) {
 
1247             assertNotNull(testName, opstart);
 
1248             assertNotNull(testName, opend);
 
1249             assertEquals(testName, expectedResult, opend.getResult());
 
1251             assertSame(testName, tstart, opstart.getStart());
 
1252             assertSame(testName, tstart, opend.getStart());
 
1255                 assertTrue(future.isDone());
 
1256                 assertEquals(testName, opend, future.get());
 
1258                 // "start" is never final
 
1259                 for (OperationOutcome outcome : starts) {
 
1260                     assertFalse(testName, outcome.isFinalOutcome());
 
1263                 // only the last "complete" is final
 
1264                 assertTrue(testName, ends.removeLast().isFinalOutcome());
 
1266                 for (OperationOutcome outcome : ends) {
 
1267                     assertFalse(outcome.isFinalOutcome());
 
1270             } catch (InterruptedException | ExecutionException e) {
 
1271                 throw new IllegalStateException(e);
 
1274             if (expectedOperations > 0) {
 
1275                 assertNotNull(testName, oper.getSubRequestId());
 
1276                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
 
1277                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
 
1281         assertEquals(testName, expectedOperations, oper.getCount());
 
1285      * Creates a new {@link #oper} whose coder will throw an exception.
 
1287     private void setOperCoderException() {
 
1288         oper = new MyOper() {
 
1290             protected Coder getCoder() {
 
1291                 return new StandardCoder() {
 
1293                     public String encode(Object object, boolean pretty) throws CoderException {
 
1294                         throw new CoderException(EXPECTED_EXCEPTION);
 
1303     public static class MyData {
 
1304         private String text = TEXT;
 
1308     private class MyOper extends OperationPartial {
 
1310         private int count = 0;
 
1313         private boolean genException;
 
1315         private int maxFailures = 0;
 
1317         private CompletableFuture<OperationOutcome> preProc;
 
1321             super(OperationPartialTest.this.params, config, PROP_NAMES);
 
1325         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
1328                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1331             operation.setSubRequestId(String.valueOf(attempt));
 
1333             if (count > maxFailures) {
 
1334                 operation.setResult(PolicyResult.SUCCESS);
 
1336                 operation.setResult(PolicyResult.FAILURE);
 
1343         protected long getRetryWaitMs() {
 
1345              * Sleep timers run in the background, but we want to control things via the
 
1346              * "executor", thus we avoid sleep timers altogether by simply returning 0.
 
1352         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
1353             return (preProc != null ? preProc : super.startPreprocessorAsync());