import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
private Future<String> future2;
@Mock
- private CompletableFuture<String> compFuture;
+ private Executor executor;
+ private CompletableFuture<String> compFuture;
private PipelineControllerFuture<String> controller;
public void setUp() {
MockitoAnnotations.initMocks(this);
+ compFuture = spy(new CompletableFuture<>());
+
controller = new PipelineControllerFuture<>();
controller.add(runnable1);
assertTrue(controller.isCancelled());
assertFalse(controller.isRunning());
- verify(runnable1).run();
- verify(runnable2).run();
- verify(future1).cancel(anyBoolean());
- verify(future2).cancel(anyBoolean());
+ verifyStopped();
// re-invoke; nothing should change
assertTrue(controller.cancel(true));
assertTrue(controller.isCancelled());
assertFalse(controller.isRunning());
- verify(runnable1).run();
- verify(runnable2).run();
- verify(future1).cancel(anyBoolean());
- verify(future2).cancel(anyBoolean());
+ verifyStopped();
+ }
+
+ @Test
+ public void testCompleteT() throws Exception {
+ assertTrue(controller.complete(TEXT));
+ assertEquals(TEXT, controller.get());
+
+ verifyStopped();
+
+ // repeat - disallowed
+ assertFalse(controller.complete(TEXT));
+ }
+
+ @Test
+ public void testCompleteExceptionallyThrowable() {
+ assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
+ assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
+
+ verifyStopped();
+
+ // repeat - disallowed
+ assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
+ }
+
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+ // haven't stopped anything yet
+ assertFalse(future.isDone());
+ verify(runnable1, never()).run();
+
+ // get the operation and run it
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(executor).execute(captor.capture());
+ captor.getValue().run();
+
+ // should be done now
+ assertTrue(future.isDone());
+
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeAsync(executor) when canceled before execution.
+ */
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
+
+ assertTrue(future.cancel(false));
+
+ verifyStopped();
+
+ assertTrue(future.isDone());
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+ }
+
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
+ CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeAsync() when canceled.
+ */
+ @Test
+ public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
+ CountDownLatch canceled = new CountDownLatch(1);
+
+ // run async, but await until canceled
+ CompletableFuture<String> future = controller.completeAsync(() -> {
+ try {
+ canceled.await();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+
+ return TEXT;
+ });
+
+ assertTrue(future.cancel(false));
+
+ // let the future run now
+ canceled.countDown();
+
+ verifyStopped();
+
+ assertTrue(future.isDone());
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+ }
+
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
+ CountDownLatch stopped = new CountDownLatch(1);
+ controller.add(() -> stopped.countDown());
+
+ CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
+
+ assertEquals(TEXT, future.get());
+
+ /*
+ * Must use latch instead of verifyStopped(), because the runnables may be
+ * executed asynchronously.
+ */
+ assertTrue(stopped.await(5, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Tests completeOnTimeout() when completed before the timeout.
+ */
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
+ CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
+ controller.complete(TEXT);
+
+ assertEquals(TEXT, future.get());
+
+ verifyStopped();
+ }
+
+ /**
+ * Tests completeOnTimeout() when canceled before the timeout.
+ */
+ @Test
+ public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
+ CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
+ assertTrue(future.cancel(true));
+
+ assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
+
+ verifyStopped();
+ }
+
+ @Test
+ public void testNewIncompleteFuture() {
+ PipelineControllerFuture<String> future = controller.newIncompleteFuture();
+ assertNotNull(future);
+ assertTrue(future instanceof PipelineControllerFuture);
+ assertNotSame(controller, future);
+ assertFalse(future.isDone());
}
@Test
verify(future2).cancel(anyBoolean());
}
+ /**
+ * Tests both wrap() methods.
+ */
@Test
- public void testAddFunction() {
- AtomicReference<String> value = new AtomicReference<>();
+ public void testWrap() throws Exception {
+ controller = spy(controller);
- Function<String, CompletableFuture<String>> func = controller.add(input -> {
- value.set(input);
+ CompletableFuture<String> future = controller.wrap(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ compFuture.complete(TEXT);
+ assertEquals(TEXT, future.get());
+
+ verify(controller).remove(compFuture);
+ }
+
+ /**
+ * Tests wrap(), when the controller is not running.
+ */
+ @Test
+ public void testWrapNotRunning() throws Exception {
+ controller.cancel(false);
+ controller = spy(controller);
+
+ assertFalse(controller.wrap(compFuture).isDone());
+ verify(controller, never()).add(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ verify(compFuture).cancel(anyBoolean());
+ }
+
+ /**
+ * Tests wrap(), when the future throws an exception.
+ */
+ @Test
+ public void testWrapException() throws Exception {
+ controller = spy(controller);
+
+ CompletableFuture<String> future = controller.wrap(compFuture);
+ verify(controller, never()).remove(compFuture);
+
+ compFuture.completeExceptionally(EXPECTED_EXCEPTION);
+ assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
+
+ verify(controller).remove(compFuture);
+ }
+
+ @Test
+ public void testWrapFunction() throws Exception {
+
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
+ compFuture.complete(input);
return compFuture;
});
- assertSame(compFuture, func.apply(TEXT));
- assertEquals(TEXT, value.get());
+ CompletableFuture<String> future = func.apply(TEXT);
+ assertTrue(compFuture.isDone());
- verify(compFuture, never()).cancel(anyBoolean());
+ assertEquals(TEXT, future.get());
// should not have completed the controller
assertFalse(controller.isDone());
+ }
+
+ /**
+ * Tests add(Function) when the controller is canceled after the future is added.
+ */
+ @Test
+ public void testWrapFunctionCancel() throws Exception {
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
+
+ CompletableFuture<String> future = func.apply(TEXT);
+ assertFalse(future.isDone());
+
+ assertFalse(compFuture.isDone());
// cancel - should propagate
controller.cancel(false);
* Tests add(Function) when the controller is not running.
*/
@Test
- public void testAddFunctionNotRunning() {
+ public void testWrapFunctionNotRunning() {
AtomicReference<String> value = new AtomicReference<>();
- Function<String, CompletableFuture<String>> func = controller.add(input -> {
+ Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
value.set(input);
return compFuture;
});
assertNull(value.get());
}
+
+ private void verifyStopped() {
+ verify(runnable1).run();
+ verify(runnable2).run();
+ verify(future1).cancel(anyBoolean());
+ verify(future2).cancel(anyBoolean());
+ }
}