2c8275986cb5e0656834decf7e1958e04226369f
[policy/models.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
23
24 import static org.assertj.core.api.Assertions.assertThatThrownBy;
25 import static org.junit.jupiter.api.Assertions.assertEquals;
26 import static org.junit.jupiter.api.Assertions.assertFalse;
27 import static org.junit.jupiter.api.Assertions.assertNotNull;
28 import static org.junit.jupiter.api.Assertions.assertNotSame;
29 import static org.junit.jupiter.api.Assertions.assertNull;
30 import static org.junit.jupiter.api.Assertions.assertTrue;
31 import static org.mockito.ArgumentMatchers.anyBoolean;
32 import static org.mockito.Mockito.never;
33 import static org.mockito.Mockito.spy;
34 import static org.mockito.Mockito.verify;
35
36 import java.util.concurrent.CancellationException;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.BiConsumer;
44 import java.util.function.Function;
45 import org.junit.jupiter.api.BeforeEach;
46 import org.junit.jupiter.api.Test;
47 import org.junit.jupiter.api.extension.ExtendWith;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.Mock;
50 import org.mockito.junit.jupiter.MockitoExtension;
51
52 @ExtendWith(MockitoExtension.class)
53 class PipelineControllerFutureTest {
54     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
55     private static final String TEXT = "some text";
56
57     @Mock
58     private Runnable runnable1;
59
60     @Mock
61     private Runnable runnable2;
62
63     @Mock
64     private Future<String> future1;
65
66     @Mock
67     private Future<String> future2;
68
69     @Mock
70     private Executor executor;
71
72
73     private CompletableFuture<String> compFuture;
74     private PipelineControllerFuture<String> controller;
75
76
77     /**
78      * Initializes fields, including {@link #controller}. Adds all runners and futures to
79      * the controller.
80      */
81     @BeforeEach
82     void setUp() {
83         compFuture = spy(new CompletableFuture<>());
84
85         controller = new PipelineControllerFuture<>();
86
87         controller.add(runnable1);
88         controller.add(future1);
89         controller.add(runnable2);
90         controller.add(future2);
91     }
92
93     @Test
94      void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
95         assertTrue(controller.isRunning());
96
97         assertTrue(controller.cancel(false));
98
99         assertTrue(controller.isCancelled());
100         assertFalse(controller.isRunning());
101
102         verifyStopped();
103
104         // re-invoke; nothing should change
105         assertTrue(controller.cancel(true));
106
107         assertTrue(controller.isCancelled());
108         assertFalse(controller.isRunning());
109
110         verifyStopped();
111     }
112
113     @Test
114      void testCompleteT() throws Exception {
115         assertTrue(controller.complete(TEXT));
116         assertEquals(TEXT, controller.get());
117
118         verifyStopped();
119
120         // repeat - disallowed
121         assertFalse(controller.complete(TEXT));
122     }
123
124     @Test
125      void testCompleteExceptionallyThrowable() {
126         assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
127         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
128
129         verifyStopped();
130
131         // repeat - disallowed
132         assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
133     }
134
135     @Test
136      void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
137         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
138
139         // haven't stopped anything yet
140         assertFalse(future.isDone());
141         verify(runnable1, never()).run();
142
143         // get the operation and run it
144         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
145         verify(executor).execute(captor.capture());
146         captor.getValue().run();
147
148         // should be done now
149         assertTrue(future.isDone());
150
151         assertEquals(TEXT, future.get());
152
153         verifyStopped();
154     }
155
156     /**
157      * Tests completeAsync(executor) when canceled before execution.
158      */
159     @Test
160      void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() {
161         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
162
163         assertTrue(future.cancel(false));
164
165         verifyStopped();
166
167         assertTrue(future.isDone());
168
169         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
170     }
171
172     @Test
173      void testCompleteAsyncSupplierOfQextendsT() throws Exception {
174         CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
175         assertEquals(TEXT, future.get());
176
177         verifyStopped();
178     }
179
180     /**
181      * Tests completeAsync() when canceled.
182      */
183     @Test
184      void testCompleteAsyncSupplierOfQextendsTCanceled() {
185         CountDownLatch canceled = new CountDownLatch(1);
186
187         // run async, but await until canceled
188         CompletableFuture<String> future = controller.completeAsync(() -> {
189             try {
190                 canceled.await();
191             } catch (InterruptedException e) {
192                 // do nothing
193             }
194
195             return TEXT;
196         });
197
198         assertTrue(future.cancel(false));
199
200         // let the future run now
201         canceled.countDown();
202
203         verifyStopped();
204
205         assertTrue(future.isDone());
206
207         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
208     }
209
210     @Test
211      void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
212         CountDownLatch stopped = new CountDownLatch(1);
213         controller.add(stopped::countDown);
214
215         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
216
217         assertEquals(TEXT, future.get());
218
219         /*
220          * Must use latch instead of verifyStopped(), because the runnables may be
221          * executed asynchronously.
222          */
223         assertTrue(stopped.await(5, TimeUnit.SECONDS));
224     }
225
226     /**
227      * Tests completeOnTimeout() when completed before the timeout.
228      */
229     @Test
230      void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
231         CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
232         controller.complete(TEXT);
233
234         assertEquals(TEXT, future.get());
235
236         verifyStopped();
237     }
238
239     /**
240      * Tests completeOnTimeout() when canceled before the timeout.
241      */
242     @Test
243      void testCompleteOnTimeoutTLongTimeUnitCanceled() {
244         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
245         assertTrue(future.cancel(true));
246
247         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
248
249         verifyStopped();
250     }
251
252     @Test
253      void testNewIncompleteFuture() {
254         PipelineControllerFuture<String> future = controller.newIncompleteFuture();
255         assertNotNull(future);
256         assertTrue(future instanceof PipelineControllerFuture);
257         assertNotSame(controller, future);
258         assertFalse(future.isDone());
259     }
260
261     @Test
262      void testDelayedComplete() throws Exception {
263         controller.add(runnable1);
264
265         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
266
267         // shouldn't have run yet
268         assertTrue(controller.isRunning());
269         verify(runnable1, never()).run();
270
271         stopper.accept(TEXT, null);
272
273         assertTrue(controller.isDone());
274         assertEquals(TEXT, controller.get());
275
276         assertFalse(controller.isRunning());
277         verify(runnable1).run();
278
279         // re-invoke; nothing should change
280         stopper.accept(TEXT, EXPECTED_EXCEPTION);
281         assertFalse(controller.isCompletedExceptionally());
282
283         assertFalse(controller.isRunning());
284         verify(runnable1).run();
285     }
286
287     /**
288      * Tests delayedComplete() when an exception is generated.
289      */
290     @Test
291      void testDelayedCompleteWithException() {
292         controller.add(runnable1);
293
294         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
295
296         // shouldn't have run yet
297         assertTrue(controller.isRunning());
298         verify(runnable1, never()).run();
299
300         stopper.accept(TEXT, EXPECTED_EXCEPTION);
301
302         assertTrue(controller.isDone());
303         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
304
305         assertFalse(controller.isRunning());
306         verify(runnable1).run();
307
308         // re-invoke; nothing should change
309         stopper.accept(TEXT, null);
310         assertTrue(controller.isCompletedExceptionally());
311
312         assertFalse(controller.isRunning());
313         verify(runnable1).run();
314     }
315
316     @Test
317      void testDelayedRemoveFutureOfF() {
318         BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
319
320         remover.accept(TEXT, EXPECTED_EXCEPTION);
321
322         // should not have completed the controller
323         assertFalse(controller.isDone());
324
325         verify(future1, never()).cancel(anyBoolean());
326
327         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
328
329         verify(future1, never()).cancel(anyBoolean());
330         verify(future2).cancel(anyBoolean());
331     }
332
333     @Test
334      void testDelayedRemoveRunnable() {
335         BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
336
337         remover.accept(TEXT, EXPECTED_EXCEPTION);
338
339         // should not have completed the controller
340         assertFalse(controller.isDone());
341
342         verify(runnable1, never()).run();
343
344         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
345
346         verify(runnable1, never()).run();
347         verify(runnable2).run();
348     }
349
350     @Test
351      void testRemoveFutureOfF_testRemoveRunnable() {
352         controller.remove(runnable2);
353         controller.remove(future1);
354
355         controller.cancel(true);
356
357         verify(runnable1).run();
358         verify(runnable2, never()).run();
359         verify(future1, never()).cancel(anyBoolean());
360         verify(future2).cancel(anyBoolean());
361     }
362
363     /**
364      * Tests both wrap() methods.
365      */
366     @Test
367      void testWrap() throws Exception {
368         controller = spy(controller);
369
370         CompletableFuture<String> future = controller.wrap(compFuture);
371         verify(controller, never()).remove(compFuture);
372
373         compFuture.complete(TEXT);
374         assertEquals(TEXT, future.get());
375
376         verify(controller).remove(compFuture);
377     }
378
379     /**
380      * Tests wrap(), when the controller is not running.
381      */
382     @Test
383      void testWrapNotRunning() {
384         controller.cancel(false);
385         controller = spy(controller);
386
387         assertFalse(controller.wrap(compFuture).isDone());
388         verify(controller, never()).add(compFuture);
389         verify(controller, never()).remove(compFuture);
390
391         verify(compFuture).cancel(anyBoolean());
392     }
393
394     /**
395      * Tests wrap(), when the future throws an exception.
396      */
397     @Test
398      void testWrapException() {
399         controller = spy(controller);
400
401         CompletableFuture<String> future = controller.wrap(compFuture);
402         verify(controller, never()).remove(compFuture);
403
404         compFuture.completeExceptionally(EXPECTED_EXCEPTION);
405         assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
406
407         verify(controller).remove(compFuture);
408     }
409
410     @Test
411      void testWrapFunction() throws Exception {
412
413         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
414             compFuture.complete(input);
415             return compFuture;
416         });
417
418         CompletableFuture<String> future = func.apply(TEXT);
419         assertTrue(compFuture.isDone());
420
421         assertEquals(TEXT, future.get());
422
423         // should not have completed the controller
424         assertFalse(controller.isDone());
425     }
426
427     /**
428      * Tests wrap(Function) when the controller is canceled after the future is added.
429      */
430     @Test
431      void testWrapFunctionCancel() {
432         Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
433
434         CompletableFuture<String> future = func.apply(TEXT);
435         assertFalse(future.isDone());
436
437         assertFalse(compFuture.isDone());
438
439         // cancel - should propagate
440         controller.cancel(false);
441
442         verify(compFuture).cancel(anyBoolean());
443     }
444
445     /**
446      * Tests wrap(Function) when the controller is not running.
447      */
448     @Test
449      void testWrapFunctionNotRunning() {
450         AtomicReference<String> value = new AtomicReference<>();
451
452         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
453             value.set(input);
454             return compFuture;
455         });
456
457         controller.cancel(false);
458
459         CompletableFuture<String> fut = func.apply(TEXT);
460         assertNotSame(compFuture, fut);
461         assertFalse(fut.isDone());
462
463         assertNull(value.get());
464     }
465
466     private void verifyStopped() {
467         verify(runnable1).run();
468         verify(runnable2).run();
469         verify(future1).cancel(anyBoolean());
470         verify(future2).cancel(anyBoolean());
471     }
472 }