2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
 
  23 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
  24 import static org.junit.Assert.assertEquals;
 
  25 import static org.junit.Assert.assertFalse;
 
  26 import static org.junit.Assert.assertNotNull;
 
  27 import static org.junit.Assert.assertNotSame;
 
  28 import static org.junit.Assert.assertNull;
 
  29 import static org.junit.Assert.assertTrue;
 
  30 import static org.mockito.ArgumentMatchers.anyBoolean;
 
  31 import static org.mockito.Mockito.never;
 
  32 import static org.mockito.Mockito.spy;
 
  33 import static org.mockito.Mockito.verify;
 
  35 import java.util.concurrent.CancellationException;
 
  36 import java.util.concurrent.CompletableFuture;
 
  37 import java.util.concurrent.CountDownLatch;
 
  38 import java.util.concurrent.Executor;
 
  39 import java.util.concurrent.Future;
 
  40 import java.util.concurrent.TimeUnit;
 
  41 import java.util.concurrent.atomic.AtomicReference;
 
  42 import java.util.function.BiConsumer;
 
  43 import java.util.function.Function;
 
  44 import org.junit.Before;
 
  45 import org.junit.Test;
 
  46 import org.mockito.ArgumentCaptor;
 
  47 import org.mockito.Mock;
 
  48 import org.mockito.MockitoAnnotations;
 
  50 public class PipelineControllerFutureTest {
 
  51     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
 
  52     private static final String TEXT = "some text";
 
  55     private Runnable runnable1;
 
  58     private Runnable runnable2;
 
  61     private Future<String> future1;
 
  64     private Future<String> future2;
 
  67     private Executor executor;
 
  70     private CompletableFuture<String> compFuture;
 
  71     private PipelineControllerFuture<String> controller;
 
  75      * Initializes fields, including {@link #controller}. Adds all runners and futures to
 
  80         MockitoAnnotations.initMocks(this);
 
  82         compFuture = spy(new CompletableFuture<>());
 
  84         controller = new PipelineControllerFuture<>();
 
  86         controller.add(runnable1);
 
  87         controller.add(future1);
 
  88         controller.add(runnable2);
 
  89         controller.add(future2);
 
  93     public void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
 
  94         assertTrue(controller.isRunning());
 
  96         assertTrue(controller.cancel(false));
 
  98         assertTrue(controller.isCancelled());
 
  99         assertFalse(controller.isRunning());
 
 103         // re-invoke; nothing should change
 
 104         assertTrue(controller.cancel(true));
 
 106         assertTrue(controller.isCancelled());
 
 107         assertFalse(controller.isRunning());
 
 113     public void testCompleteT() throws Exception {
 
 114         assertTrue(controller.complete(TEXT));
 
 115         assertEquals(TEXT, controller.get());
 
 119         // repeat - disallowed
 
 120         assertFalse(controller.complete(TEXT));
 
 124     public void testCompleteExceptionallyThrowable() {
 
 125         assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
 
 126         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
 
 130         // repeat - disallowed
 
 131         assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
 
 135     public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
 
 136         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
 
 138         // haven't stopped anything yet
 
 139         assertFalse(future.isDone());
 
 140         verify(runnable1, never()).run();
 
 142         // get the operation and run it
 
 143         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
 144         verify(executor).execute(captor.capture());
 
 145         captor.getValue().run();
 
 147         // should be done now
 
 148         assertTrue(future.isDone());
 
 150         assertEquals(TEXT, future.get());
 
 156      * Tests completeAsync(executor) when canceled before execution.
 
 159     public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
 
 160         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
 
 162         assertTrue(future.cancel(false));
 
 166         assertTrue(future.isDone());
 
 168         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
 
 172     public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
 
 173         CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
 
 174         assertEquals(TEXT, future.get());
 
 180      * Tests completeAsync() when canceled.
 
 183     public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
 
 184         CountDownLatch canceled = new CountDownLatch(1);
 
 186         // run async, but await until canceled
 
 187         CompletableFuture<String> future = controller.completeAsync(() -> {
 
 190             } catch (InterruptedException e) {
 
 197         assertTrue(future.cancel(false));
 
 199         // let the future run now
 
 200         canceled.countDown();
 
 204         assertTrue(future.isDone());
 
 206         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
 
 210     public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
 
 211         CountDownLatch stopped = new CountDownLatch(1);
 
 212         controller.add(() -> stopped.countDown());
 
 214         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
 
 216         assertEquals(TEXT, future.get());
 
 219          * Must use latch instead of verifyStopped(), because the runnables may be
 
 220          * executed asynchronously.
 
 222         assertTrue(stopped.await(5, TimeUnit.SECONDS));
 
 226      * Tests completeOnTimeout() when completed before the timeout.
 
 229     public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
 
 230         CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
 
 231         controller.complete(TEXT);
 
 233         assertEquals(TEXT, future.get());
 
 239      * Tests completeOnTimeout() when canceled before the timeout.
 
 242     public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
 
 243         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
 
 244         assertTrue(future.cancel(true));
 
 246         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
 
 252     public void testNewIncompleteFuture() {
 
 253         PipelineControllerFuture<String> future = controller.newIncompleteFuture();
 
 254         assertNotNull(future);
 
 255         assertTrue(future instanceof PipelineControllerFuture);
 
 256         assertNotSame(controller, future);
 
 257         assertFalse(future.isDone());
 
 261     public void testDelayedComplete() throws Exception {
 
 262         controller.add(runnable1);
 
 264         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
 
 266         // shouldn't have run yet
 
 267         assertTrue(controller.isRunning());
 
 268         verify(runnable1, never()).run();
 
 270         stopper.accept(TEXT, null);
 
 272         assertTrue(controller.isDone());
 
 273         assertEquals(TEXT, controller.get());
 
 275         assertFalse(controller.isRunning());
 
 276         verify(runnable1).run();
 
 278         // re-invoke; nothing should change
 
 279         stopper.accept(TEXT, EXPECTED_EXCEPTION);
 
 280         assertFalse(controller.isCompletedExceptionally());
 
 282         assertFalse(controller.isRunning());
 
 283         verify(runnable1).run();
 
 287      * Tests delayedComplete() when an exception is generated.
 
 290     public void testDelayedCompleteWithException() throws Exception {
 
 291         controller.add(runnable1);
 
 293         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
 
 295         // shouldn't have run yet
 
 296         assertTrue(controller.isRunning());
 
 297         verify(runnable1, never()).run();
 
 299         stopper.accept(TEXT, EXPECTED_EXCEPTION);
 
 301         assertTrue(controller.isDone());
 
 302         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
 
 304         assertFalse(controller.isRunning());
 
 305         verify(runnable1).run();
 
 307         // re-invoke; nothing should change
 
 308         stopper.accept(TEXT, null);
 
 309         assertTrue(controller.isCompletedExceptionally());
 
 311         assertFalse(controller.isRunning());
 
 312         verify(runnable1).run();
 
 316     public void testDelayedRemoveFutureOfF() throws Exception {
 
 317         BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
 
 319         remover.accept(TEXT, EXPECTED_EXCEPTION);
 
 321         // should not have completed the controller
 
 322         assertFalse(controller.isDone());
 
 324         verify(future1, never()).cancel(anyBoolean());
 
 326         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
 
 328         verify(future1, never()).cancel(anyBoolean());
 
 329         verify(future2).cancel(anyBoolean());
 
 333     public void testDelayedRemoveRunnable() throws Exception {
 
 334         BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
 
 336         remover.accept(TEXT, EXPECTED_EXCEPTION);
 
 338         // should not have completed the controller
 
 339         assertFalse(controller.isDone());
 
 341         verify(runnable1, never()).run();
 
 343         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
 
 345         verify(runnable1, never()).run();
 
 346         verify(runnable2).run();
 
 350     public void testRemoveFutureOfF_testRemoveRunnable() {
 
 351         controller.remove(runnable2);
 
 352         controller.remove(future1);
 
 354         controller.cancel(true);
 
 356         verify(runnable1).run();
 
 357         verify(runnable2, never()).run();
 
 358         verify(future1, never()).cancel(anyBoolean());
 
 359         verify(future2).cancel(anyBoolean());
 
 363      * Tests both wrap() methods.
 
 366     public void testWrap() throws Exception {
 
 367         controller = spy(controller);
 
 369         CompletableFuture<String> future = controller.wrap(compFuture);
 
 370         verify(controller, never()).remove(compFuture);
 
 372         compFuture.complete(TEXT);
 
 373         assertEquals(TEXT, future.get());
 
 375         verify(controller).remove(compFuture);
 
 379      * Tests wrap(), when the controller is not running.
 
 382     public void testWrapNotRunning() throws Exception {
 
 383         controller.cancel(false);
 
 384         controller = spy(controller);
 
 386         assertFalse(controller.wrap(compFuture).isDone());
 
 387         verify(controller, never()).add(compFuture);
 
 388         verify(controller, never()).remove(compFuture);
 
 390         verify(compFuture).cancel(anyBoolean());
 
 394      * Tests wrap(), when the future throws an exception.
 
 397     public void testWrapException() throws Exception {
 
 398         controller = spy(controller);
 
 400         CompletableFuture<String> future = controller.wrap(compFuture);
 
 401         verify(controller, never()).remove(compFuture);
 
 403         compFuture.completeExceptionally(EXPECTED_EXCEPTION);
 
 404         assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
 
 406         verify(controller).remove(compFuture);
 
 410     public void testWrapFunction() throws Exception {
 
 412         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
 
 413             compFuture.complete(input);
 
 417         CompletableFuture<String> future = func.apply(TEXT);
 
 418         assertTrue(compFuture.isDone());
 
 420         assertEquals(TEXT, future.get());
 
 422         // should not have completed the controller
 
 423         assertFalse(controller.isDone());
 
 427      * Tests wrap(Function) when the controller is canceled after the future is added.
 
 430     public void testWrapFunctionCancel() throws Exception {
 
 431         Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
 
 433         CompletableFuture<String> future = func.apply(TEXT);
 
 434         assertFalse(future.isDone());
 
 436         assertFalse(compFuture.isDone());
 
 438         // cancel - should propagate
 
 439         controller.cancel(false);
 
 441         verify(compFuture).cancel(anyBoolean());
 
 445      * Tests wrap(Function) when the controller is not running.
 
 448     public void testWrapFunctionNotRunning() {
 
 449         AtomicReference<String> value = new AtomicReference<>();
 
 451         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
 
 456         controller.cancel(false);
 
 458         CompletableFuture<String> fut = func.apply(TEXT);
 
 459         assertNotSame(compFuture, fut);
 
 460         assertFalse(fut.isDone());
 
 462         assertNull(value.get());
 
 465     private void verifyStopped() {
 
 466         verify(runnable1).run();
 
 467         verify(runnable2).run();
 
 468         verify(future1).cancel(anyBoolean());
 
 469         verify(future2).cancel(anyBoolean());