2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.jupiter.api.Assertions.assertEquals;
26 import static org.junit.jupiter.api.Assertions.assertFalse;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNotSame;
29 import static org.junit.jupiter.api.Assertions.assertNull;
30 import static org.junit.jupiter.api.Assertions.assertTrue;
31 import static org.mockito.ArgumentMatchers.anyBoolean;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.verify;
36 import java.util.concurrent.CancellationException;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.BiConsumer;
44 import java.util.function.Function;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.junit.jupiter.api.extension.ExtendWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Mock;
50 import org.mockito.junit.jupiter.MockitoExtension;
52 @ExtendWith(MockitoExtension.class)
53 class PipelineControllerFutureTest {
54 private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
55 private static final String TEXT = "some text";
58 private Runnable runnable1;
61 private Runnable runnable2;
64 private Future<String> future1;
67 private Future<String> future2;
70 private Executor executor;
73 private CompletableFuture<String> compFuture;
74 private PipelineControllerFuture<String> controller;
78 * Initializes fields, including {@link #controller}. Adds all runners and futures to
83 compFuture = spy(new CompletableFuture<>());
85 controller = new PipelineControllerFuture<>();
87 controller.add(runnable1);
88 controller.add(future1);
89 controller.add(runnable2);
90 controller.add(future2);
94 void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
95 assertTrue(controller.isRunning());
97 assertTrue(controller.cancel(false));
99 assertTrue(controller.isCancelled());
100 assertFalse(controller.isRunning());
104 // re-invoke; nothing should change
105 assertTrue(controller.cancel(true));
107 assertTrue(controller.isCancelled());
108 assertFalse(controller.isRunning());
114 void testCompleteT() throws Exception {
115 assertTrue(controller.complete(TEXT));
116 assertEquals(TEXT, controller.get());
120 // repeat - disallowed
121 assertFalse(controller.complete(TEXT));
125 void testCompleteExceptionallyThrowable() {
126 assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
127 assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
131 // repeat - disallowed
132 assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
136 void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
137 CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
139 // haven't stopped anything yet
140 assertFalse(future.isDone());
141 verify(runnable1, never()).run();
143 // get the operation and run it
144 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
145 verify(executor).execute(captor.capture());
146 captor.getValue().run();
148 // should be done now
149 assertTrue(future.isDone());
151 assertEquals(TEXT, future.get());
157 * Tests completeAsync(executor) when canceled before execution.
160 void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() {
161 CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
163 assertTrue(future.cancel(false));
167 assertTrue(future.isDone());
169 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
173 void testCompleteAsyncSupplierOfQextendsT() throws Exception {
174 CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
175 assertEquals(TEXT, future.get());
181 * Tests completeAsync() when canceled.
184 void testCompleteAsyncSupplierOfQextendsTCanceled() {
185 CountDownLatch canceled = new CountDownLatch(1);
187 // run async, but await until canceled
188 CompletableFuture<String> future = controller.completeAsync(() -> {
191 } catch (InterruptedException e) {
198 assertTrue(future.cancel(false));
200 // let the future run now
201 canceled.countDown();
205 assertTrue(future.isDone());
207 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
211 void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
212 CountDownLatch stopped = new CountDownLatch(1);
213 controller.add(stopped::countDown);
215 CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
217 assertEquals(TEXT, future.get());
220 * Must use latch instead of verifyStopped(), because the runnables may be
221 * executed asynchronously.
223 assertTrue(stopped.await(5, TimeUnit.SECONDS));
227 * Tests completeOnTimeout() when completed before the timeout.
230 void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
231 CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
232 controller.complete(TEXT);
234 assertEquals(TEXT, future.get());
240 * Tests completeOnTimeout() when canceled before the timeout.
243 void testCompleteOnTimeoutTLongTimeUnitCanceled() {
244 CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
245 assertTrue(future.cancel(true));
247 assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
253 void testNewIncompleteFuture() {
254 PipelineControllerFuture<String> future = controller.newIncompleteFuture();
255 assertNotNull(future);
256 assertTrue(future instanceof PipelineControllerFuture);
257 assertNotSame(controller, future);
258 assertFalse(future.isDone());
262 void testDelayedComplete() throws Exception {
263 controller.add(runnable1);
265 BiConsumer<String, Throwable> stopper = controller.delayedComplete();
267 // shouldn't have run yet
268 assertTrue(controller.isRunning());
269 verify(runnable1, never()).run();
271 stopper.accept(TEXT, null);
273 assertTrue(controller.isDone());
274 assertEquals(TEXT, controller.get());
276 assertFalse(controller.isRunning());
277 verify(runnable1).run();
279 // re-invoke; nothing should change
280 stopper.accept(TEXT, EXPECTED_EXCEPTION);
281 assertFalse(controller.isCompletedExceptionally());
283 assertFalse(controller.isRunning());
284 verify(runnable1).run();
288 * Tests delayedComplete() when an exception is generated.
291 void testDelayedCompleteWithException() {
292 controller.add(runnable1);
294 BiConsumer<String, Throwable> stopper = controller.delayedComplete();
296 // shouldn't have run yet
297 assertTrue(controller.isRunning());
298 verify(runnable1, never()).run();
300 stopper.accept(TEXT, EXPECTED_EXCEPTION);
302 assertTrue(controller.isDone());
303 assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
305 assertFalse(controller.isRunning());
306 verify(runnable1).run();
308 // re-invoke; nothing should change
309 stopper.accept(TEXT, null);
310 assertTrue(controller.isCompletedExceptionally());
312 assertFalse(controller.isRunning());
313 verify(runnable1).run();
317 void testDelayedRemoveFutureOfF() {
318 BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
320 remover.accept(TEXT, EXPECTED_EXCEPTION);
322 // should not have completed the controller
323 assertFalse(controller.isDone());
325 verify(future1, never()).cancel(anyBoolean());
327 controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
329 verify(future1, never()).cancel(anyBoolean());
330 verify(future2).cancel(anyBoolean());
334 void testDelayedRemoveRunnable() {
335 BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
337 remover.accept(TEXT, EXPECTED_EXCEPTION);
339 // should not have completed the controller
340 assertFalse(controller.isDone());
342 verify(runnable1, never()).run();
344 controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
346 verify(runnable1, never()).run();
347 verify(runnable2).run();
351 void testRemoveFutureOfF_testRemoveRunnable() {
352 controller.remove(runnable2);
353 controller.remove(future1);
355 controller.cancel(true);
357 verify(runnable1).run();
358 verify(runnable2, never()).run();
359 verify(future1, never()).cancel(anyBoolean());
360 verify(future2).cancel(anyBoolean());
364 * Tests both wrap() methods.
367 void testWrap() throws Exception {
368 controller = spy(controller);
370 CompletableFuture<String> future = controller.wrap(compFuture);
371 verify(controller, never()).remove(compFuture);
373 compFuture.complete(TEXT);
374 assertEquals(TEXT, future.get());
376 verify(controller).remove(compFuture);
380 * Tests wrap(), when the controller is not running.
383 void testWrapNotRunning() {
384 controller.cancel(false);
385 controller = spy(controller);
387 assertFalse(controller.wrap(compFuture).isDone());
388 verify(controller, never()).add(compFuture);
389 verify(controller, never()).remove(compFuture);
391 verify(compFuture).cancel(anyBoolean());
395 * Tests wrap(), when the future throws an exception.
398 void testWrapException() {
399 controller = spy(controller);
401 CompletableFuture<String> future = controller.wrap(compFuture);
402 verify(controller, never()).remove(compFuture);
404 compFuture.completeExceptionally(EXPECTED_EXCEPTION);
405 assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
407 verify(controller).remove(compFuture);
411 void testWrapFunction() throws Exception {
413 Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
414 compFuture.complete(input);
418 CompletableFuture<String> future = func.apply(TEXT);
419 assertTrue(compFuture.isDone());
421 assertEquals(TEXT, future.get());
423 // should not have completed the controller
424 assertFalse(controller.isDone());
428 * Tests wrap(Function) when the controller is canceled after the future is added.
431 void testWrapFunctionCancel() {
432 Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
434 CompletableFuture<String> future = func.apply(TEXT);
435 assertFalse(future.isDone());
437 assertFalse(compFuture.isDone());
439 // cancel - should propagate
440 controller.cancel(false);
442 verify(compFuture).cancel(anyBoolean());
446 * Tests wrap(Function) when the controller is not running.
449 void testWrapFunctionNotRunning() {
450 AtomicReference<String> value = new AtomicReference<>();
452 Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
457 controller.cancel(false);
459 CompletableFuture<String> fut = func.apply(TEXT);
460 assertNotSame(compFuture, fut);
461 assertFalse(fut.isDone());
463 assertNull(value.get());
466 private void verifyStopped() {
467 verify(runnable1).run();
468 verify(runnable2).run();
469 verify(future1).cancel(anyBoolean());
470 verify(future2).cancel(anyBoolean());