Clean up and enhancement of Actor re-design
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / pipeline / PipelineControllerFutureTest.java
index b421c1c..a6b11ef 100644 (file)
@@ -23,21 +23,27 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
@@ -58,9 +64,10 @@ public class PipelineControllerFutureTest {
     private Future<String> future2;
 
     @Mock
-    private CompletableFuture<String> compFuture;
+    private Executor executor;
 
 
+    private CompletableFuture<String> compFuture;
     private PipelineControllerFuture<String> controller;
 
 
@@ -72,6 +79,8 @@ public class PipelineControllerFutureTest {
     public void setUp() {
         MockitoAnnotations.initMocks(this);
 
+        compFuture = spy(new CompletableFuture<>());
+
         controller = new PipelineControllerFuture<>();
 
         controller.add(runnable1);
@@ -89,10 +98,7 @@ public class PipelineControllerFutureTest {
         assertTrue(controller.isCancelled());
         assertFalse(controller.isRunning());
 
-        verify(runnable1).run();
-        verify(runnable2).run();
-        verify(future1).cancel(anyBoolean());
-        verify(future2).cancel(anyBoolean());
+        verifyStopped();
 
         // re-invoke; nothing should change
         assertTrue(controller.cancel(true));
@@ -100,10 +106,155 @@ public class PipelineControllerFutureTest {
         assertTrue(controller.isCancelled());
         assertFalse(controller.isRunning());
 
-        verify(runnable1).run();
-        verify(runnable2).run();
-        verify(future1).cancel(anyBoolean());
-        verify(future2).cancel(anyBoolean());
+        verifyStopped();
+    }
+
+    @Test
+    public void testCompleteT() throws Exception {
+        assertTrue(controller.complete(TEXT));
+        assertEquals(TEXT, controller.get());
+
+        verifyStopped();
+
+        // repeat - disallowed
+        assertFalse(controller.complete(TEXT));
+    }
+
+    @Test
+    public void testCompleteExceptionallyThrowable() {
+        assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
+        assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
+
+        verifyStopped();
+
+        // repeat - disallowed
+        assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
+    }
+
+    @Test
+    public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
+        CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+        // haven't stopped anything yet
+        assertFalse(future.isDone());
+        verify(runnable1, never()).run();
+
+        // get the operation and run it
+        ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+        verify(executor).execute(captor.capture());
+        captor.getValue().run();
+
+        // should be done now
+        assertTrue(future.isDone());
+
+        assertEquals(TEXT, future.get());
+
+        verifyStopped();
+    }
+
+    /**
+     * Tests completeAsync(executor) when canceled before execution.
+     */
+    @Test
+    public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
+        CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+        assertTrue(future.cancel(false));
+
+        verifyStopped();
+
+        assertTrue(future.isDone());
+
+        assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+    }
+
+    @Test
+    public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
+        CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
+        assertEquals(TEXT, future.get());
+
+        verifyStopped();
+    }
+
+    /**
+     * Tests completeAsync() when canceled.
+     */
+    @Test
+    public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
+        CountDownLatch canceled = new CountDownLatch(1);
+
+        // run async, but await until canceled
+        CompletableFuture<String> future = controller.completeAsync(() -> {
+            try {
+                canceled.await();
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+
+            return TEXT;
+        });
+
+        assertTrue(future.cancel(false));
+
+        // let the future run now
+        canceled.countDown();
+
+        verifyStopped();
+
+        assertTrue(future.isDone());
+
+        assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+    }
+
+    @Test
+    public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
+        CountDownLatch stopped = new CountDownLatch(1);
+        controller.add(() -> stopped.countDown());
+
+        CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
+
+        assertEquals(TEXT, future.get());
+
+        /*
+         * Must use latch instead of verifyStopped(), because the runnables may be
+         * executed asynchronously.
+         */
+        assertTrue(stopped.await(5, TimeUnit.SECONDS));
+    }
+
+    /**
+     * Tests completeOnTimeout() when completed before the timeout.
+     */
+    @Test
+    public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
+        CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
+        controller.complete(TEXT);
+
+        assertEquals(TEXT, future.get());
+
+        verifyStopped();
+    }
+
+    /**
+     * Tests completeOnTimeout() when canceled before the timeout.
+     */
+    @Test
+    public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
+        CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
+        assertTrue(future.cancel(true));
+
+        assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+
+        verifyStopped();
+    }
+
+    @Test
+    public void testNewIncompleteFuture() {
+        PipelineControllerFuture<String> future = controller.newIncompleteFuture();
+        assertNotNull(future);
+        assertTrue(future instanceof PipelineControllerFuture);
+        assertNotSame(controller, future);
+        assertFalse(future.isDone());
     }
 
     @Test
@@ -208,22 +359,81 @@ public class PipelineControllerFutureTest {
         verify(future2).cancel(anyBoolean());
     }
 
+    /**
+     * Tests both wrap() methods.
+     */
     @Test
-    public void testAddFunction() {
-        AtomicReference<String> value = new AtomicReference<>();
+    public void testWrap() throws Exception {
+        controller = spy(controller);
 
-        Function<String, CompletableFuture<String>> func = controller.add(input -> {
-            value.set(input);
+        CompletableFuture<String> future = controller.wrap(compFuture);
+        verify(controller, never()).remove(compFuture);
+
+        compFuture.complete(TEXT);
+        assertEquals(TEXT, future.get());
+
+        verify(controller).remove(compFuture);
+    }
+
+    /**
+     * Tests wrap(), when the controller is not running.
+     */
+    @Test
+    public void testWrapNotRunning() throws Exception {
+        controller.cancel(false);
+        controller = spy(controller);
+
+        assertFalse(controller.wrap(compFuture).isDone());
+        verify(controller, never()).add(compFuture);
+        verify(controller, never()).remove(compFuture);
+
+        verify(compFuture).cancel(anyBoolean());
+    }
+
+    /**
+     * Tests wrap(), when the future throws an exception.
+     */
+    @Test
+    public void testWrapException() throws Exception {
+        controller = spy(controller);
+
+        CompletableFuture<String> future = controller.wrap(compFuture);
+        verify(controller, never()).remove(compFuture);
+
+        compFuture.completeExceptionally(EXPECTED_EXCEPTION);
+        assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
+
+        verify(controller).remove(compFuture);
+    }
+
+    @Test
+    public void testWrapFunction() throws Exception {
+
+        Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
+            compFuture.complete(input);
             return compFuture;
         });
 
-        assertSame(compFuture, func.apply(TEXT));
-        assertEquals(TEXT, value.get());
+        CompletableFuture<String> future = func.apply(TEXT);
+        assertTrue(compFuture.isDone());
 
-        verify(compFuture, never()).cancel(anyBoolean());
+        assertEquals(TEXT, future.get());
 
         // should not have completed the controller
         assertFalse(controller.isDone());
+    }
+
+    /**
+     * Tests add(Function) when the controller is canceled after the future is added.
+     */
+    @Test
+    public void testWrapFunctionCancel() throws Exception {
+        Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
+
+        CompletableFuture<String> future = func.apply(TEXT);
+        assertFalse(future.isDone());
+
+        assertFalse(compFuture.isDone());
 
         // cancel - should propagate
         controller.cancel(false);
@@ -235,10 +445,10 @@ public class PipelineControllerFutureTest {
      * Tests add(Function) when the controller is not running.
      */
     @Test
-    public void testAddFunctionNotRunning() {
+    public void testWrapFunctionNotRunning() {
         AtomicReference<String> value = new AtomicReference<>();
 
-        Function<String, CompletableFuture<String>> func = controller.add(input -> {
+        Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
             value.set(input);
             return compFuture;
         });
@@ -251,4 +461,11 @@ public class PipelineControllerFutureTest {
 
         assertNull(value.get());
     }
+
+    private void verifyStopped() {
+        verify(runnable1).run();
+        verify(runnable2).run();
+        verify(future1).cancel(anyBoolean());
+        verify(future2).cancel(anyBoolean());
+    }
 }