2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 
  25 import static org.junit.Assert.assertEquals;
 
  26 import static org.junit.Assert.assertFalse;
 
  27 import static org.junit.Assert.assertNotNull;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertSame;
 
  30 import static org.junit.Assert.assertTrue;
 
  31 import static org.mockito.ArgumentMatchers.any;
 
  32 import static org.mockito.Mockito.verify;
 
  33 import static org.mockito.Mockito.when;
 
  35 import ch.qos.logback.classic.Logger;
 
  36 import java.time.Instant;
 
  37 import java.util.ArrayDeque;
 
  38 import java.util.Arrays;
 
  39 import java.util.Collections;
 
  40 import java.util.Deque;
 
  41 import java.util.LinkedList;
 
  42 import java.util.List;
 
  44 import java.util.Map.Entry;
 
  45 import java.util.UUID;
 
  46 import java.util.concurrent.CompletableFuture;
 
  47 import java.util.concurrent.CompletionException;
 
  48 import java.util.concurrent.ExecutionException;
 
  49 import java.util.concurrent.ForkJoinPool;
 
  50 import java.util.concurrent.Future;
 
  51 import java.util.concurrent.TimeUnit;
 
  52 import java.util.concurrent.TimeoutException;
 
  53 import java.util.concurrent.atomic.AtomicReference;
 
  54 import java.util.function.Consumer;
 
  55 import java.util.function.Supplier;
 
  56 import java.util.stream.Collectors;
 
  59 import org.junit.AfterClass;
 
  60 import org.junit.Before;
 
  61 import org.junit.BeforeClass;
 
  62 import org.junit.Test;
 
  63 import org.mockito.ArgumentCaptor;
 
  64 import org.mockito.Mock;
 
  65 import org.mockito.MockitoAnnotations;
 
  66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  68 import org.onap.policy.common.utils.coder.Coder;
 
  69 import org.onap.policy.common.utils.coder.CoderException;
 
  70 import org.onap.policy.common.utils.coder.StandardCoder;
 
  71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
 
  72 import org.onap.policy.common.utils.time.PseudoExecutor;
 
  73 import org.onap.policy.controlloop.ControlLoopOperation;
 
  74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
 
  75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
 
  76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 
  79 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
 
  80 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  81 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
 
  82 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
 
  83 import org.onap.policy.controlloop.policy.PolicyResult;
 
  84 import org.slf4j.LoggerFactory;
 
  86 public class OperationPartialTest {
 
  87     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
 
  88     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
 
  89     private static final int MAX_REQUESTS = 100;
 
  90     private static final int MAX_PARALLEL = 10;
 
  91     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  92     private static final String ACTOR = "my-actor";
 
  93     private static final String OPERATION = "my-operation";
 
  94     private static final String MY_SINK = "my-sink";
 
  95     private static final String MY_SOURCE = "my-source";
 
  96     private static final String MY_TARGET_ENTITY = "my-entity";
 
  97     private static final String TEXT = "my-text";
 
  98     private static final int TIMEOUT = 1000;
 
  99     private static final UUID REQ_ID = UUID.randomUUID();
 
 101     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
 
 102                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
 
 105      * Used to attach an appender to the class' logger.
 
 107     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
 
 108     private static final ExtractAppender appender = new ExtractAppender();
 
 110     private static final List<String> PROP_NAMES = List.of("hello", "world");
 
 113     private ActorService service;
 
 115     private Actor guardActor;
 
 117     private Operator guardOperator;
 
 119     private Operation guardOperation;
 
 121     private VirtualControlLoopEvent event;
 
 122     private ControlLoopEventContext context;
 
 123     private PseudoExecutor executor;
 
 124     private ControlLoopOperationParams params;
 
 128     private int numStart;
 
 131     private Instant tstart;
 
 133     private OperationOutcome opstart;
 
 134     private OperationOutcome opend;
 
 136     private Deque<OperationOutcome> starts;
 
 137     private Deque<OperationOutcome> ends;
 
 139     private OperatorConfig config;
 
 142      * Attaches the appender to the logger.
 
 145     public static void setUpBeforeClass() throws Exception {
 
 147          * Attach appender to the logger.
 
 149         appender.setContext(logger.getLoggerContext());
 
 152         logger.addAppender(appender);
 
 156      * Stops the appender.
 
 159     public static void tearDownAfterClass() {
 
 164      * Initializes the fields, including {@link #oper}.
 
 167     public void setUp() {
 
 168         MockitoAnnotations.initMocks(this);
 
 170         event = new VirtualControlLoopEvent();
 
 171         event.setRequestId(REQ_ID);
 
 173         context = new ControlLoopEventContext(event);
 
 174         executor = new PseudoExecutor();
 
 176         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
 
 177                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
 
 178                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
 
 180         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
 
 181         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
 
 182         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
 
 183         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
 
 185         config = new OperatorConfig(executor);
 
 194         starts = new ArrayDeque<>(10);
 
 195         ends = new ArrayDeque<>(10);
 
 199     public void testOperatorPartial_testGetActorName_testGetName() {
 
 200         assertEquals(ACTOR, oper.getActorName());
 
 201         assertEquals(OPERATION, oper.getName());
 
 202         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
 
 206     public void testGetBlockingThread() throws Exception {
 
 207         CompletableFuture<Void> future = new CompletableFuture<>();
 
 209         // use the real executor
 
 210         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
 
 212             public Operation buildOperation(ControlLoopOperationParams params) {
 
 217         oper2.getBlockingExecutor().execute(() -> future.complete(null));
 
 219         assertNull(future.get(5, TimeUnit.SECONDS));
 
 223     public void testGetPropertyNames() {
 
 224         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
 
 228     public void testGetProperty_testSetProperty() {
 
 229         oper.setProperty("propertyA", "valueA");
 
 230         oper.setProperty("propertyB", "valueB");
 
 231         oper.setProperty("propertyC", 20);
 
 233         assertEquals("valueA", oper.getProperty("propertyA"));
 
 234         assertEquals("valueB", oper.getProperty("propertyB"));
 
 235         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
 
 239     public void testStart() {
 
 240         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
 
 244      * Tests start() with multiple running requests.
 
 247     public void testStartMultiple() {
 
 248         for (int count = 0; count < MAX_PARALLEL; ++count) {
 
 252         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
 
 254         assertNotNull(opstart);
 
 255         assertNotNull(opend);
 
 256         assertEquals(PolicyResult.SUCCESS, opend.getResult());
 
 258         assertEquals(MAX_PARALLEL, numStart);
 
 259         assertEquals(MAX_PARALLEL, oper.getCount());
 
 260         assertEquals(MAX_PARALLEL, numEnd);
 
 264      * Tests startPreprocessor() when the preprocessor returns a failure.
 
 267     public void testStartPreprocessorFailure() {
 
 268         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 270         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
 
 274      * Tests startPreprocessor() when the preprocessor throws an exception.
 
 277     public void testStartPreprocessorException() {
 
 278         // arrange for the preprocessor to throw an exception
 
 279         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
 
 281         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
 
 285      * Tests startPreprocessor() when the pipeline is not running.
 
 288     public void testStartPreprocessorNotRunning() {
 
 289         // arrange for the preprocessor to return success, which will be ignored
 
 290         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
 
 292         oper.start().cancel(false);
 
 293         assertTrue(executor.runAll(MAX_REQUESTS));
 
 298         assertEquals(0, numStart);
 
 299         assertEquals(0, oper.getCount());
 
 300         assertEquals(0, numEnd);
 
 304      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
 
 307     public void testStartPreprocessorBuilderException() {
 
 308         oper = new MyOper() {
 
 310             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
 311                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 315         assertThatIllegalStateException().isThrownBy(() -> oper.start());
 
 317         // should be nothing in the queue
 
 318         assertEquals(0, executor.getQueueLength());
 
 322     public void testStartPreprocessorAsync() {
 
 323         assertNull(oper.startPreprocessorAsync());
 
 327     public void testStartGuardAsync() throws Exception {
 
 328         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
 
 329         assertTrue(future.isDone());
 
 330         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
 
 332         // verify the parameters that were passed
 
 333         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
 
 334                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
 
 335         verify(guardOperator).buildOperation(paramsCaptor.capture());
 
 337         params = paramsCaptor.getValue();
 
 338         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
 
 339         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
 
 340         assertNull(params.getRetry());
 
 341         assertNull(params.getTimeoutSec());
 
 343         Map<String, Object> payload = params.getPayload();
 
 344         assertNotNull(payload);
 
 346         assertEquals(oper.makeGuardPayload(), payload);
 
 350     public void testMakeGuardPayload() {
 
 351         Map<String, Object> payload = oper.makeGuardPayload();
 
 352         assertSame(REQ_ID, payload.get("requestId"));
 
 354         // request id changes, so remove it
 
 355         payload.remove("requestId");
 
 357         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
 
 359         // repeat, but with closed loop name
 
 360         event.setClosedLoopControlName("my-loop");
 
 361         payload = oper.makeGuardPayload();
 
 362         payload.remove("requestId");
 
 363         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
 
 367     public void testStartOperationAsync() {
 
 369         assertTrue(executor.runAll(MAX_REQUESTS));
 
 371         assertEquals(1, oper.getCount());
 
 375     public void testIsSuccess() {
 
 376         assertFalse(oper.isSuccess(null));
 
 378         OperationOutcome outcome = new OperationOutcome();
 
 380         outcome.setResult(PolicyResult.SUCCESS);
 
 381         assertTrue(oper.isSuccess(outcome));
 
 383         for (PolicyResult failure : FAILURE_RESULTS) {
 
 384             outcome.setResult(failure);
 
 385             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
 
 390     public void testIsActorFailed() {
 
 391         assertFalse(oper.isActorFailed(null));
 
 393         OperationOutcome outcome = params.makeOutcome();
 
 396         outcome.setResult(PolicyResult.SUCCESS);
 
 397         assertFalse(oper.isActorFailed(outcome));
 
 399         outcome.setResult(PolicyResult.FAILURE_RETRIES);
 
 400         assertFalse(oper.isActorFailed(outcome));
 
 403         outcome.setResult(PolicyResult.FAILURE);
 
 406         outcome.setActor(MY_SINK);
 
 407         assertFalse(oper.isActorFailed(outcome));
 
 408         outcome.setActor(null);
 
 409         assertFalse(oper.isActorFailed(outcome));
 
 410         outcome.setActor(ACTOR);
 
 412         // incorrect operation
 
 413         outcome.setOperation(MY_SINK);
 
 414         assertFalse(oper.isActorFailed(outcome));
 
 415         outcome.setOperation(null);
 
 416         assertFalse(oper.isActorFailed(outcome));
 
 417         outcome.setOperation(OPERATION);
 
 420         assertTrue(oper.isActorFailed(outcome));
 
 424     public void testDoOperation() {
 
 426          * Use an operation that doesn't override doOperation().
 
 428         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
 431         assertTrue(executor.runAll(MAX_REQUESTS));
 
 433         assertNotNull(opend);
 
 434         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
 
 438     public void testTimeout() throws Exception {
 
 440         // use a real executor
 
 441         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
 
 443         // trigger timeout very quickly
 
 444         oper = new MyOper() {
 
 446             protected long getTimeoutMs(Integer timeoutSec) {
 
 451             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 453                 OperationOutcome outcome2 = params.makeOutcome();
 
 454                 outcome2.setResult(PolicyResult.SUCCESS);
 
 457                  * Create an incomplete future that will timeout after the operation's
 
 458                  * timeout. If it fires before the other timer, then it will return a
 
 461                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
 
 462                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
 
 463                                 params.getExecutor());
 
 469         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
 
 473      * Tests retry functions, when the count is set to zero and retries are exhausted.
 
 476     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
 
 477         params = params.toBuilder().retry(0).build();
 
 479         // new params, thus need a new operation
 
 482         oper.setMaxFailures(10);
 
 484         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
 
 488      * Tests retry functions, when the count is null and retries are exhausted.
 
 491     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
 
 492         params = params.toBuilder().retry(null).build();
 
 494         // new params, thus need a new operation
 
 497         oper.setMaxFailures(10);
 
 499         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
 
 503      * Tests retry functions, when retries are exhausted.
 
 506     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
 
 507         final int maxRetries = 3;
 
 508         params = params.toBuilder().retry(maxRetries).build();
 
 510         // new params, thus need a new operation
 
 513         oper.setMaxFailures(10);
 
 515         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
 
 516                         PolicyResult.FAILURE_RETRIES);
 
 520      * Tests retry functions, when a success follows some retries.
 
 523     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
 
 524         params = params.toBuilder().retry(10).build();
 
 526         // new params, thus need a new operation
 
 529         final int maxFailures = 3;
 
 530         oper.setMaxFailures(maxFailures);
 
 532         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
 
 533                         PolicyResult.SUCCESS);
 
 537      * Tests retry functions, when the outcome is {@code null}.
 
 540     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
 
 542         // arrange to return null from doOperation()
 
 543         oper = new MyOper() {
 
 545             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
 
 548                 super.doOperation(attempt, outcome);
 
 553         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
 
 557     public void testSleep() throws Exception {
 
 558         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
 
 559         assertTrue(future.isDone());
 
 560         assertNull(future.get());
 
 563         future = oper.sleep(0, TimeUnit.SECONDS);
 
 564         assertTrue(future.isDone());
 
 565         assertNull(future.get());
 
 568          * Start a second sleep we can use to check the first while it's running.
 
 570         tstart = Instant.now();
 
 571         future = oper.sleep(100, TimeUnit.MILLISECONDS);
 
 573         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
 
 575         // wait for second to complete and verify that the first has not completed
 
 577         assertFalse(future.isDone());
 
 579         // wait for second to complete
 
 582         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
 
 583         assertTrue(diff >= 99);
 
 587     public void testIsSameOperation() {
 
 588         assertFalse(oper.isSameOperation(null));
 
 590         OperationOutcome outcome = params.makeOutcome();
 
 592         // wrong actor - should be false
 
 593         outcome.setActor(null);
 
 594         assertFalse(oper.isSameOperation(outcome));
 
 595         outcome.setActor(MY_SINK);
 
 596         assertFalse(oper.isSameOperation(outcome));
 
 597         outcome.setActor(ACTOR);
 
 599         // wrong operation - should be null
 
 600         outcome.setOperation(null);
 
 601         assertFalse(oper.isSameOperation(outcome));
 
 602         outcome.setOperation(MY_SINK);
 
 603         assertFalse(oper.isSameOperation(outcome));
 
 604         outcome.setOperation(OPERATION);
 
 606         assertTrue(oper.isSameOperation(outcome));
 
 610      * Tests handleFailure() when the outcome is a success.
 
 613     public void testHandlePreprocessorFailureSuccess() {
 
 614         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
 
 615         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
 
 619      * Tests handleFailure() when the outcome is <i>not</i> a success.
 
 622     public void testHandlePreprocessorFailureFailed() throws Exception {
 
 623         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
 
 624         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
 
 628      * Tests handleFailure() when the outcome is {@code null}.
 
 631     public void testHandlePreprocessorFailureNull() throws Exception {
 
 632         // arrange to return a null outcome from the preprocessor
 
 633         oper.setPreProc(CompletableFuture.completedFuture(null));
 
 634         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
 
 638     public void testFromException() {
 
 639         // arrange to generate an exception when operation runs
 
 640         oper.setGenException(true);
 
 642         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
 
 646      * Tests fromException() when there is no exception.
 
 649     public void testFromExceptionNoExcept() {
 
 650         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
 
 654      * Tests both flavors of anyOf(), because one invokes the other.
 
 657     public void testAnyOf() throws Exception {
 
 658         // first task completes, others do not
 
 659         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 661         final OperationOutcome outcome = params.makeOutcome();
 
 663         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 664         tasks.add(() -> new CompletableFuture<>());
 
 665         tasks.add(() -> null);
 
 666         tasks.add(() -> new CompletableFuture<>());
 
 668         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
 
 669         assertTrue(executor.runAll(MAX_REQUESTS));
 
 670         assertTrue(result.isDone());
 
 671         assertSame(outcome, result.get());
 
 673         // repeat using array form
 
 674         @SuppressWarnings("unchecked")
 
 675         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 676         result = oper.anyOf(tasks.toArray(taskArray));
 
 677         assertTrue(executor.runAll(MAX_REQUESTS));
 
 678         assertTrue(result.isDone());
 
 679         assertSame(outcome, result.get());
 
 681         // second task completes, others do not
 
 683         tasks.add(() -> new CompletableFuture<>());
 
 684         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 685         tasks.add(() -> new CompletableFuture<>());
 
 687         result = oper.anyOf(tasks);
 
 688         assertTrue(executor.runAll(MAX_REQUESTS));
 
 689         assertTrue(result.isDone());
 
 690         assertSame(outcome, result.get());
 
 692         // third task completes, others do not
 
 694         tasks.add(() -> new CompletableFuture<>());
 
 695         tasks.add(() -> new CompletableFuture<>());
 
 696         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 698         result = oper.anyOf(tasks);
 
 699         assertTrue(executor.runAll(MAX_REQUESTS));
 
 700         assertTrue(result.isDone());
 
 701         assertSame(outcome, result.get());
 
 705      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
 
 708     @SuppressWarnings("unchecked")
 
 709     public void testAnyOfEdge() throws Exception {
 
 710         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 712         // zero items: check both using a list and using an array
 
 713         assertNull(oper.anyOf(tasks));
 
 714         assertNull(oper.anyOf());
 
 716         // one item: : check both using a list and using an array
 
 717         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 718         tasks.add(() -> future1);
 
 720         assertSame(future1, oper.anyOf(tasks));
 
 721         assertSame(future1, oper.anyOf(() -> future1));
 
 725     public void testAllOfArray() throws Exception {
 
 726         final OperationOutcome outcome = params.makeOutcome();
 
 728         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 729         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 730         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 732         @SuppressWarnings("unchecked")
 
 733         CompletableFuture<OperationOutcome> result =
 
 734                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
 
 736         assertTrue(executor.runAll(MAX_REQUESTS));
 
 737         assertFalse(result.isDone());
 
 738         future1.complete(outcome);
 
 740         // complete 3 before 2
 
 741         assertTrue(executor.runAll(MAX_REQUESTS));
 
 742         assertFalse(result.isDone());
 
 743         future3.complete(outcome);
 
 745         assertTrue(executor.runAll(MAX_REQUESTS));
 
 746         assertFalse(result.isDone());
 
 747         future2.complete(outcome);
 
 749         // all of them are now done
 
 750         assertTrue(executor.runAll(MAX_REQUESTS));
 
 751         assertTrue(result.isDone());
 
 752         assertSame(outcome, result.get());
 
 756     public void testAllOfList() throws Exception {
 
 757         final OperationOutcome outcome = params.makeOutcome();
 
 759         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 760         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 761         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 763         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 764         tasks.add(() -> future1);
 
 765         tasks.add(() -> future2);
 
 766         tasks.add(() -> null);
 
 767         tasks.add(() -> future3);
 
 769         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 771         assertTrue(executor.runAll(MAX_REQUESTS));
 
 772         assertFalse(result.isDone());
 
 773         future1.complete(outcome);
 
 775         // complete 3 before 2
 
 776         assertTrue(executor.runAll(MAX_REQUESTS));
 
 777         assertFalse(result.isDone());
 
 778         future3.complete(outcome);
 
 780         assertTrue(executor.runAll(MAX_REQUESTS));
 
 781         assertFalse(result.isDone());
 
 782         future2.complete(outcome);
 
 784         // all of them are now done
 
 785         assertTrue(executor.runAll(MAX_REQUESTS));
 
 786         assertTrue(result.isDone());
 
 787         assertSame(outcome, result.get());
 
 791      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
 
 794     @SuppressWarnings("unchecked")
 
 795     public void testAllOfEdge() throws Exception {
 
 796         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 798         // zero items: check both using a list and using an array
 
 799         assertNull(oper.allOf(tasks));
 
 800         assertNull(oper.allOf());
 
 802         // one item: : check both using a list and using an array
 
 803         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 804         tasks.add(() -> future1);
 
 806         assertSame(future1, oper.allOf(tasks));
 
 807         assertSame(future1, oper.allOf(() -> future1));
 
 811     public void testAttachFutures() throws Exception {
 
 812         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 814         // third task throws an exception during construction
 
 815         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 816         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
 
 817         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
 
 818         tasks.add(() -> future1);
 
 819         tasks.add(() -> future2);
 
 821             throw new IllegalStateException(EXPECTED_EXCEPTION);
 
 823         tasks.add(() -> future3);
 
 825         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
 
 827         // should have canceled the first two, but not the last
 
 828         assertTrue(future1.isCancelled());
 
 829         assertTrue(future2.isCancelled());
 
 830         assertFalse(future3.isCancelled());
 
 834     public void testCombineOutcomes() throws Exception {
 
 836         verifyOutcomes(0, PolicyResult.SUCCESS);
 
 837         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
 
 839         // maximum is in different positions
 
 840         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
 
 841         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
 
 842         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
 
 844         // null outcome - takes precedence over a success
 
 845         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 846         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 847         tasks.add(() -> CompletableFuture.completedFuture(null));
 
 848         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 849         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 851         assertTrue(executor.runAll(MAX_REQUESTS));
 
 852         assertTrue(result.isDone());
 
 853         assertNull(result.get());
 
 855         // one throws an exception during execution
 
 856         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
 
 859         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 860         tasks.add(() -> CompletableFuture.failedFuture(except));
 
 861         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
 
 862         result = oper.allOf(tasks);
 
 864         assertTrue(executor.runAll(MAX_REQUESTS));
 
 865         assertTrue(result.isCompletedExceptionally());
 
 866         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
 
 870      * Tests both flavors of sequence(), because one invokes the other.
 
 873     public void testSequence() throws Exception {
 
 874         final OperationOutcome outcome = params.makeOutcome();
 
 876         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 877         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 878         tasks.add(() -> null);
 
 879         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 880         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 882         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
 
 883         assertTrue(executor.runAll(MAX_REQUESTS));
 
 884         assertTrue(result.isDone());
 
 885         assertSame(outcome, result.get());
 
 887         // repeat using array form
 
 888         @SuppressWarnings("unchecked")
 
 889         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
 
 890         result = oper.sequence(tasks.toArray(taskArray));
 
 891         assertTrue(executor.runAll(MAX_REQUESTS));
 
 892         assertTrue(result.isDone());
 
 893         assertSame(outcome, result.get());
 
 895         // second task fails, third should not run
 
 896         OperationOutcome failure = params.makeOutcome();
 
 897         failure.setResult(PolicyResult.FAILURE);
 
 899         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 900         tasks.add(() -> CompletableFuture.completedFuture(failure));
 
 901         tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 903         result = oper.sequence(tasks);
 
 904         assertTrue(executor.runAll(MAX_REQUESTS));
 
 905         assertTrue(result.isDone());
 
 906         assertSame(failure, result.get());
 
 910      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
 
 913     @SuppressWarnings("unchecked")
 
 914     public void testSequenceEdge() throws Exception {
 
 915         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 917         // zero items: check both using a list and using an array
 
 918         assertNull(oper.sequence(tasks));
 
 919         assertNull(oper.sequence());
 
 921         // one item: : check both using a list and using an array
 
 922         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
 
 923         tasks.add(() -> future1);
 
 925         assertSame(future1, oper.sequence(tasks));
 
 926         assertSame(future1, oper.sequence(() -> future1));
 
 929     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
 
 930         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
 
 932         OperationOutcome expectedOutcome = null;
 
 934         for (int count = 0; count < results.length; ++count) {
 
 935             OperationOutcome outcome = params.makeOutcome();
 
 936             outcome.setResult(results[count]);
 
 937             tasks.add(() -> CompletableFuture.completedFuture(outcome));
 
 939             if (count == expected) {
 
 940                 expectedOutcome = outcome;
 
 944         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
 
 946         assertTrue(executor.runAll(MAX_REQUESTS));
 
 947         assertTrue(result.isDone());
 
 948         assertSame(expectedOutcome, result.get());
 
 952     public void testDetmPriority() throws CoderException {
 
 953         assertEquals(1, oper.detmPriority(null));
 
 955         OperationOutcome outcome = params.makeOutcome();
 
 957         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
 
 958                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
 
 959                         PolicyResult.FAILURE_EXCEPTION, 6);
 
 961         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
 
 962             outcome.setResult(ent.getKey());
 
 963             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
 
 967          * Test null result. We can't actually set it to null, because the set() method
 
 968          * won't allow it. Instead, we decode it from a structure.
 
 970         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
 
 971         assertEquals(1, oper.detmPriority(outcome));
 
 975      * Tests callbackStarted() when the pipeline has already been stopped.
 
 978     public void testCallbackStartedNotRunning() {
 
 979         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
 982          * arrange to stop the controller when the start-callback is invoked, but capture
 
 985         params = params.toBuilder().startCallback(oper -> {
 
 987             future.get().cancel(false);
 
 990         // new params, thus need a new operation
 
 993         future.set(oper.start());
 
 994         assertTrue(executor.runAll(MAX_REQUESTS));
 
 996         // should have only run once
 
 997         assertEquals(1, numStart);
 
1001      * Tests callbackCompleted() when the pipeline has already been stopped.
 
1004     public void testCallbackCompletedNotRunning() {
 
1005         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
 
1007         // arrange to stop the controller when the start-callback is invoked
 
1008         params = params.toBuilder().startCallback(oper -> {
 
1009             future.get().cancel(false);
 
1012         // new params, thus need a new operation
 
1013         oper = new MyOper();
 
1015         future.set(oper.start());
 
1016         assertTrue(executor.runAll(MAX_REQUESTS));
 
1018         // should not have been set
 
1020         assertEquals(0, numEnd);
 
1024     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
 
1025         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
 
1027         OperationOutcome outcome;
 
1029         outcome = new OperationOutcome();
 
1030         oper.setOutcome(outcome, timex);
 
1031         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1032         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
 
1034         outcome = new OperationOutcome();
 
1035         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
 
1036         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1037         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
 
1041     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
 
1042         OperationOutcome outcome;
 
1044         outcome = new OperationOutcome();
 
1045         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1046         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1047         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1049         oper.setOutcome(outcome, PolicyResult.SUCCESS);
 
1050         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
 
1051         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
 
1053         for (PolicyResult result : FAILURE_RESULTS) {
 
1054             outcome = new OperationOutcome();
 
1055             oper.setOutcome(outcome, result);
 
1056             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
 
1057             assertEquals(result.toString(), result, outcome.getResult());
 
1062     public void testIsTimeout() {
 
1063         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
 
1065         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
 
1066         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
 
1067         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
 
1068         assertFalse(oper.isTimeout(new CompletionException(null)));
 
1069         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
 
1071         assertTrue(oper.isTimeout(timex));
 
1072         assertTrue(oper.isTimeout(new CompletionException(timex)));
 
1076     public void testLogMessage() {
 
1077         final String infraStr = SINK_INFRA.toString();
 
1079         // log structured data
 
1080         appender.clearExtractions();
 
1081         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1082         List<String> output = appender.getExtracted();
 
1083         assertEquals(1, output.size());
 
1085         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
 
1086                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1088         // repeat with a response
 
1089         appender.clearExtractions();
 
1090         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1091         output = appender.getExtracted();
 
1092         assertEquals(1, output.size());
 
1094         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
 
1095                         .contains("{\n  \"text\": \"my-text\"\n}");
 
1097         // log a plain string
 
1098         appender.clearExtractions();
 
1099         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
 
1100         output = appender.getExtracted();
 
1101         assertEquals(1, output.size());
 
1102         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
 
1104         // log a null request
 
1105         appender.clearExtractions();
 
1106         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
 
1107         output = appender.getExtracted();
 
1108         assertEquals(1, output.size());
 
1110         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
 
1112         // generate exception from coder
 
1113         setOperCoderException();
 
1115         appender.clearExtractions();
 
1116         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
 
1117         output = appender.getExtracted();
 
1118         assertEquals(2, output.size());
 
1119         assertThat(output.get(0)).contains("cannot pretty-print request");
 
1120         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
 
1122         // repeat with a response
 
1123         appender.clearExtractions();
 
1124         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
 
1125         output = appender.getExtracted();
 
1126         assertEquals(2, output.size());
 
1127         assertThat(output.get(0)).contains("cannot pretty-print response");
 
1128         assertThat(output.get(1)).contains(MY_SOURCE);
 
1132     public void testGetRetry() {
 
1133         assertEquals(0, oper.getRetry(null));
 
1134         assertEquals(10, oper.getRetry(10));
 
1138     public void testGetRetryWait() {
 
1139         // need an operator that doesn't override the retry time
 
1140         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
 
1141         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
 
1145     public void testGetTimeOutMs() {
 
1146         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
 
1148         params = params.toBuilder().timeoutSec(null).build();
 
1150         // new params, thus need a new operation
 
1151         oper = new MyOper();
 
1153         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
 
1156     private void starter(OperationOutcome oper) {
 
1158         tstart = oper.getStart();
 
1163     private void completer(OperationOutcome oper) {
 
1170      * Gets a function that does nothing.
 
1172      * @param <T> type of input parameter expected by the function
 
1173      * @return a function that does nothing
 
1175     private <T> Consumer<T> noop() {
 
1180     private OperationOutcome makeSuccess() {
 
1181         OperationOutcome outcome = params.makeOutcome();
 
1182         outcome.setResult(PolicyResult.SUCCESS);
 
1187     private OperationOutcome makeFailure() {
 
1188         OperationOutcome outcome = params.makeOutcome();
 
1189         outcome.setResult(PolicyResult.FAILURE);
 
1197      * @param testName test name
 
1198      * @param expectedCallbacks number of callbacks expected
 
1199      * @param expectedOperations number of operation invocations expected
 
1200      * @param expectedResult expected outcome
 
1202     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
 
1203                     PolicyResult expectedResult) {
 
1205         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
 
1211      * @param testName test name
 
1212      * @param expectedCallbacks number of callbacks expected
 
1213      * @param expectedOperations number of operation invocations expected
 
1214      * @param expectedResult expected outcome
 
1215      * @param manipulator function to modify the future returned by
 
1216      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
 
1217      *        in the executor are run
 
1219     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
 
1220                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
 
1228         CompletableFuture<OperationOutcome> future = oper.start();
 
1230         manipulator.accept(future);
 
1232         assertTrue(testName, executor.runAll(MAX_REQUESTS));
 
1234         assertEquals(testName, expectedCallbacks, numStart);
 
1235         assertEquals(testName, expectedCallbacks, numEnd);
 
1237         if (expectedCallbacks > 0) {
 
1238             assertNotNull(testName, opstart);
 
1239             assertNotNull(testName, opend);
 
1240             assertEquals(testName, expectedResult, opend.getResult());
 
1242             assertSame(testName, tstart, opstart.getStart());
 
1243             assertSame(testName, tstart, opend.getStart());
 
1246                 assertTrue(future.isDone());
 
1247                 assertEquals(testName, opend, future.get());
 
1249                 // "start" is never final
 
1250                 for (OperationOutcome outcome : starts) {
 
1251                     assertFalse(testName, outcome.isFinalOutcome());
 
1254                 // only the last "complete" is final
 
1255                 assertTrue(testName, ends.removeLast().isFinalOutcome());
 
1257                 for (OperationOutcome outcome : ends) {
 
1258                     assertFalse(outcome.isFinalOutcome());
 
1261             } catch (InterruptedException | ExecutionException e) {
 
1262                 throw new IllegalStateException(e);
 
1265             if (expectedOperations > 0) {
 
1266                 assertNotNull(testName, oper.getSubRequestId());
 
1267                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
 
1268                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
 
1272         assertEquals(testName, expectedOperations, oper.getCount());
 
1276      * Creates a new {@link #oper} whose coder will throw an exception.
 
1278     private void setOperCoderException() {
 
1279         oper = new MyOper() {
 
1281             protected Coder getCoder() {
 
1282                 return new StandardCoder() {
 
1284                     public String encode(Object object, boolean pretty) throws CoderException {
 
1285                         throw new CoderException(EXPECTED_EXCEPTION);
 
1294     public static class MyData {
 
1295         private String text = TEXT;
 
1299     private class MyOper extends OperationPartial {
 
1301         private int count = 0;
 
1304         private boolean genException;
 
1306         private int maxFailures = 0;
 
1308         private CompletableFuture<OperationOutcome> preProc;
 
1312             super(OperationPartialTest.this.params, config, PROP_NAMES);
 
1316         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
1319                 throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1322             operation.setSubRequestId(String.valueOf(attempt));
 
1324             if (count > maxFailures) {
 
1325                 operation.setResult(PolicyResult.SUCCESS);
 
1327                 operation.setResult(PolicyResult.FAILURE);
 
1334         protected long getRetryWaitMs() {
 
1336              * Sleep timers run in the background, but we want to control things via the
 
1337              * "executor", thus we avoid sleep timers altogether by simply returning 0.
 
1343         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
1344             return (preProc != null ? preProc : super.startPreprocessorAsync());