2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
23 import static org.assertj.core.api.Assertions.assertThatThrownBy;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNotSame;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.ArgumentMatchers.anyBoolean;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.spy;
33 import static org.mockito.Mockito.verify;
35 import java.util.concurrent.CancellationException;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import java.util.function.BiConsumer;
43 import java.util.function.Function;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
50 public class PipelineControllerFutureTest {
51 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
52 private static final String TEXT = "some text";
55 private Runnable runnable1;
58 private Runnable runnable2;
61 private Future<String> future1;
64 private Future<String> future2;
67 private Executor executor;
70 private CompletableFuture<String> compFuture;
71 private PipelineControllerFuture<String> controller;
75 * Initializes fields, including {@link #controller}. Adds all runners and futures to
80 MockitoAnnotations.initMocks(this);
82 compFuture = spy(new CompletableFuture<>());
84 controller = new PipelineControllerFuture<>();
86 controller.add(runnable1);
87 controller.add(future1);
88 controller.add(runnable2);
89 controller.add(future2);
93 public void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
94 assertTrue(controller.isRunning());
96 assertTrue(controller.cancel(false));
98 assertTrue(controller.isCancelled());
99 assertFalse(controller.isRunning());
103 // re-invoke; nothing should change
104 assertTrue(controller.cancel(true));
106 assertTrue(controller.isCancelled());
107 assertFalse(controller.isRunning());
113 public void testCompleteT() throws Exception {
114 assertTrue(controller.complete(TEXT));
115 assertEquals(TEXT, controller.get());
119 // repeat - disallowed
120 assertFalse(controller.complete(TEXT));
124 public void testCompleteExceptionallyThrowable() {
125 assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
126 assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
130 // repeat - disallowed
131 assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
135 public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
136 CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
138 // haven't stopped anything yet
139 assertFalse(future.isDone());
140 verify(runnable1, never()).run();
142 // get the operation and run it
143 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
144 verify(executor).execute(captor.capture());
145 captor.getValue().run();
147 // should be done now
148 assertTrue(future.isDone());
150 assertEquals(TEXT, future.get());
156 * Tests completeAsync(executor) when canceled before execution.
159 public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
160 CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
162 assertTrue(future.cancel(false));
166 assertTrue(future.isDone());
168 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
172 public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
173 CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
174 assertEquals(TEXT, future.get());
180 * Tests completeAsync() when canceled.
183 public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
184 CountDownLatch canceled = new CountDownLatch(1);
186 // run async, but await until canceled
187 CompletableFuture<String> future = controller.completeAsync(() -> {
190 } catch (InterruptedException e) {
197 assertTrue(future.cancel(false));
199 // let the future run now
200 canceled.countDown();
204 assertTrue(future.isDone());
206 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
210 public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
211 CountDownLatch stopped = new CountDownLatch(1);
212 controller.add(() -> stopped.countDown());
214 CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
216 assertEquals(TEXT, future.get());
219 * Must use latch instead of verifyStopped(), because the runnables may be
220 * executed asynchronously.
222 assertTrue(stopped.await(5, TimeUnit.SECONDS));
226 * Tests completeOnTimeout() when completed before the timeout.
229 public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
230 CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
231 controller.complete(TEXT);
233 assertEquals(TEXT, future.get());
239 * Tests completeOnTimeout() when canceled before the timeout.
242 public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
243 CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
244 assertTrue(future.cancel(true));
246 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
252 public void testNewIncompleteFuture() {
253 PipelineControllerFuture<String> future = controller.newIncompleteFuture();
254 assertNotNull(future);
255 assertTrue(future instanceof PipelineControllerFuture);
256 assertNotSame(controller, future);
257 assertFalse(future.isDone());
261 public void testDelayedComplete() throws Exception {
262 controller.add(runnable1);
264 BiConsumer<String, Throwable> stopper = controller.delayedComplete();
266 // shouldn't have run yet
267 assertTrue(controller.isRunning());
268 verify(runnable1, never()).run();
270 stopper.accept(TEXT, null);
272 assertTrue(controller.isDone());
273 assertEquals(TEXT, controller.get());
275 assertFalse(controller.isRunning());
276 verify(runnable1).run();
278 // re-invoke; nothing should change
279 stopper.accept(TEXT, EXPECTED_EXCEPTION);
280 assertFalse(controller.isCompletedExceptionally());
282 assertFalse(controller.isRunning());
283 verify(runnable1).run();
287 * Tests delayedComplete() when an exception is generated.
290 public void testDelayedCompleteWithException() throws Exception {
291 controller.add(runnable1);
293 BiConsumer<String, Throwable> stopper = controller.delayedComplete();
295 // shouldn't have run yet
296 assertTrue(controller.isRunning());
297 verify(runnable1, never()).run();
299 stopper.accept(TEXT, EXPECTED_EXCEPTION);
301 assertTrue(controller.isDone());
302 assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
304 assertFalse(controller.isRunning());
305 verify(runnable1).run();
307 // re-invoke; nothing should change
308 stopper.accept(TEXT, null);
309 assertTrue(controller.isCompletedExceptionally());
311 assertFalse(controller.isRunning());
312 verify(runnable1).run();
316 public void testDelayedRemoveFutureOfF() throws Exception {
317 BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
319 remover.accept(TEXT, EXPECTED_EXCEPTION);
321 // should not have completed the controller
322 assertFalse(controller.isDone());
324 verify(future1, never()).cancel(anyBoolean());
326 controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
328 verify(future1, never()).cancel(anyBoolean());
329 verify(future2).cancel(anyBoolean());
333 public void testDelayedRemoveRunnable() throws Exception {
334 BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
336 remover.accept(TEXT, EXPECTED_EXCEPTION);
338 // should not have completed the controller
339 assertFalse(controller.isDone());
341 verify(runnable1, never()).run();
343 controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
345 verify(runnable1, never()).run();
346 verify(runnable2).run();
350 public void testRemoveFutureOfF_testRemoveRunnable() {
351 controller.remove(runnable2);
352 controller.remove(future1);
354 controller.cancel(true);
356 verify(runnable1).run();
357 verify(runnable2, never()).run();
358 verify(future1, never()).cancel(anyBoolean());
359 verify(future2).cancel(anyBoolean());
363 * Tests both wrap() methods.
366 public void testWrap() throws Exception {
367 controller = spy(controller);
369 CompletableFuture<String> future = controller.wrap(compFuture);
370 verify(controller, never()).remove(compFuture);
372 compFuture.complete(TEXT);
373 assertEquals(TEXT, future.get());
375 verify(controller).remove(compFuture);
379 * Tests wrap(), when the controller is not running.
382 public void testWrapNotRunning() throws Exception {
383 controller.cancel(false);
384 controller = spy(controller);
386 assertFalse(controller.wrap(compFuture).isDone());
387 verify(controller, never()).add(compFuture);
388 verify(controller, never()).remove(compFuture);
390 verify(compFuture).cancel(anyBoolean());
394 * Tests wrap(), when the future throws an exception.
397 public void testWrapException() throws Exception {
398 controller = spy(controller);
400 CompletableFuture<String> future = controller.wrap(compFuture);
401 verify(controller, never()).remove(compFuture);
403 compFuture.completeExceptionally(EXPECTED_EXCEPTION);
404 assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
406 verify(controller).remove(compFuture);
410 public void testWrapFunction() throws Exception {
412 Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
413 compFuture.complete(input);
417 CompletableFuture<String> future = func.apply(TEXT);
418 assertTrue(compFuture.isDone());
420 assertEquals(TEXT, future.get());
422 // should not have completed the controller
423 assertFalse(controller.isDone());
427 * Tests wrap(Function) when the controller is canceled after the future is added.
430 public void testWrapFunctionCancel() throws Exception {
431 Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
433 CompletableFuture<String> future = func.apply(TEXT);
434 assertFalse(future.isDone());
436 assertFalse(compFuture.isDone());
438 // cancel - should propagate
439 controller.cancel(false);
441 verify(compFuture).cancel(anyBoolean());
445 * Tests wrap(Function) when the controller is not running.
448 public void testWrapFunctionNotRunning() {
449 AtomicReference<String> value = new AtomicReference<>();
451 Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
456 controller.cancel(false);
458 CompletableFuture<String> fut = func.apply(TEXT);
459 assertNotSame(compFuture, fut);
460 assertFalse(fut.isDone());
462 assertNull(value.get());
465 private void verifyStopped() {
466 verify(runnable1).run();
467 verify(runnable2).run();
468 verify(future1).cancel(anyBoolean());
469 verify(future2).cancel(anyBoolean());