Fix sonars from depeendency upgrade
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / pipeline / PipelineControllerFutureTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
22
23 import static org.assertj.core.api.Assertions.assertThatThrownBy;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNotSame;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.ArgumentMatchers.anyBoolean;
31 import static org.mockito.Mockito.never;
32 import static org.mockito.Mockito.spy;
33 import static org.mockito.Mockito.verify;
34
35 import java.util.concurrent.CancellationException;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicReference;
42 import java.util.function.BiConsumer;
43 import java.util.function.Function;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.junit.runner.RunWith;
47 import org.mockito.ArgumentCaptor;
48 import org.mockito.Mock;
49 import org.mockito.junit.MockitoJUnitRunner;
50
51 @RunWith(MockitoJUnitRunner.class)
52 public class PipelineControllerFutureTest {
53     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
54     private static final String TEXT = "some text";
55
56     @Mock
57     private Runnable runnable1;
58
59     @Mock
60     private Runnable runnable2;
61
62     @Mock
63     private Future<String> future1;
64
65     @Mock
66     private Future<String> future2;
67
68     @Mock
69     private Executor executor;
70
71
72     private CompletableFuture<String> compFuture;
73     private PipelineControllerFuture<String> controller;
74
75
76     /**
77      * Initializes fields, including {@link #controller}. Adds all runners and futures to
78      * the controller.
79      */
80     @Before
81     public void setUp() {
82         compFuture = spy(new CompletableFuture<>());
83
84         controller = new PipelineControllerFuture<>();
85
86         controller.add(runnable1);
87         controller.add(future1);
88         controller.add(runnable2);
89         controller.add(future2);
90     }
91
92     @Test
93     public void testCancel_testAddFutureOfFBoolean_testAddRunnable__testIsRunning() {
94         assertTrue(controller.isRunning());
95
96         assertTrue(controller.cancel(false));
97
98         assertTrue(controller.isCancelled());
99         assertFalse(controller.isRunning());
100
101         verifyStopped();
102
103         // re-invoke; nothing should change
104         assertTrue(controller.cancel(true));
105
106         assertTrue(controller.isCancelled());
107         assertFalse(controller.isRunning());
108
109         verifyStopped();
110     }
111
112     @Test
113     public void testCompleteT() throws Exception {
114         assertTrue(controller.complete(TEXT));
115         assertEquals(TEXT, controller.get());
116
117         verifyStopped();
118
119         // repeat - disallowed
120         assertFalse(controller.complete(TEXT));
121     }
122
123     @Test
124     public void testCompleteExceptionallyThrowable() {
125         assertTrue(controller.completeExceptionally(EXPECTED_EXCEPTION));
126         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
127
128         verifyStopped();
129
130         // repeat - disallowed
131         assertFalse(controller.completeExceptionally(EXPECTED_EXCEPTION));
132     }
133
134     @Test
135     public void testCompleteAsyncSupplierOfQextendsTExecutor() throws Exception {
136         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
137
138         // haven't stopped anything yet
139         assertFalse(future.isDone());
140         verify(runnable1, never()).run();
141
142         // get the operation and run it
143         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
144         verify(executor).execute(captor.capture());
145         captor.getValue().run();
146
147         // should be done now
148         assertTrue(future.isDone());
149
150         assertEquals(TEXT, future.get());
151
152         verifyStopped();
153     }
154
155     /**
156      * Tests completeAsync(executor) when canceled before execution.
157      */
158     @Test
159     public void testCompleteAsyncSupplierOfQextendsTExecutorCanceled() throws Exception {
160         CompletableFuture<String> future = controller.completeAsync(() -> TEXT, executor);
161
162         assertTrue(future.cancel(false));
163
164         verifyStopped();
165
166         assertTrue(future.isDone());
167
168         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
169     }
170
171     @Test
172     public void testCompleteAsyncSupplierOfQextendsT() throws Exception {
173         CompletableFuture<String> future = controller.completeAsync(() -> TEXT);
174         assertEquals(TEXT, future.get());
175
176         verifyStopped();
177     }
178
179     /**
180      * Tests completeAsync() when canceled.
181      */
182     @Test
183     public void testCompleteAsyncSupplierOfQextendsTCanceled() throws Exception {
184         CountDownLatch canceled = new CountDownLatch(1);
185
186         // run async, but await until canceled
187         CompletableFuture<String> future = controller.completeAsync(() -> {
188             try {
189                 canceled.await();
190             } catch (InterruptedException e) {
191                 // do nothing
192             }
193
194             return TEXT;
195         });
196
197         assertTrue(future.cancel(false));
198
199         // let the future run now
200         canceled.countDown();
201
202         verifyStopped();
203
204         assertTrue(future.isDone());
205
206         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
207     }
208
209     @Test
210     public void testCompleteOnTimeoutTLongTimeUnit() throws Exception {
211         CountDownLatch stopped = new CountDownLatch(1);
212         controller.add(() -> stopped.countDown());
213
214         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 1, TimeUnit.MILLISECONDS);
215
216         assertEquals(TEXT, future.get());
217
218         /*
219          * Must use latch instead of verifyStopped(), because the runnables may be
220          * executed asynchronously.
221          */
222         assertTrue(stopped.await(5, TimeUnit.SECONDS));
223     }
224
225     /**
226      * Tests completeOnTimeout() when completed before the timeout.
227      */
228     @Test
229     public void testCompleteOnTimeoutTLongTimeUnitNoTimeout() throws Exception {
230         CompletableFuture<String> future = controller.completeOnTimeout("timed out", 5, TimeUnit.SECONDS);
231         controller.complete(TEXT);
232
233         assertEquals(TEXT, future.get());
234
235         verifyStopped();
236     }
237
238     /**
239      * Tests completeOnTimeout() when canceled before the timeout.
240      */
241     @Test
242     public void testCompleteOnTimeoutTLongTimeUnitCanceled() {
243         CompletableFuture<String> future = controller.completeOnTimeout(TEXT, 5, TimeUnit.SECONDS);
244         assertTrue(future.cancel(true));
245
246         assertThatThrownBy(() -> controller.get()).isInstanceOf(CancellationException.class);
247
248         verifyStopped();
249     }
250
251     @Test
252     public void testNewIncompleteFuture() {
253         PipelineControllerFuture<String> future = controller.newIncompleteFuture();
254         assertNotNull(future);
255         assertTrue(future instanceof PipelineControllerFuture);
256         assertNotSame(controller, future);
257         assertFalse(future.isDone());
258     }
259
260     @Test
261     public void testDelayedComplete() throws Exception {
262         controller.add(runnable1);
263
264         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
265
266         // shouldn't have run yet
267         assertTrue(controller.isRunning());
268         verify(runnable1, never()).run();
269
270         stopper.accept(TEXT, null);
271
272         assertTrue(controller.isDone());
273         assertEquals(TEXT, controller.get());
274
275         assertFalse(controller.isRunning());
276         verify(runnable1).run();
277
278         // re-invoke; nothing should change
279         stopper.accept(TEXT, EXPECTED_EXCEPTION);
280         assertFalse(controller.isCompletedExceptionally());
281
282         assertFalse(controller.isRunning());
283         verify(runnable1).run();
284     }
285
286     /**
287      * Tests delayedComplete() when an exception is generated.
288      */
289     @Test
290     public void testDelayedCompleteWithException() throws Exception {
291         controller.add(runnable1);
292
293         BiConsumer<String, Throwable> stopper = controller.delayedComplete();
294
295         // shouldn't have run yet
296         assertTrue(controller.isRunning());
297         verify(runnable1, never()).run();
298
299         stopper.accept(TEXT, EXPECTED_EXCEPTION);
300
301         assertTrue(controller.isDone());
302         assertThatThrownBy(() -> controller.get()).hasCause(EXPECTED_EXCEPTION);
303
304         assertFalse(controller.isRunning());
305         verify(runnable1).run();
306
307         // re-invoke; nothing should change
308         stopper.accept(TEXT, null);
309         assertTrue(controller.isCompletedExceptionally());
310
311         assertFalse(controller.isRunning());
312         verify(runnable1).run();
313     }
314
315     @Test
316     public void testDelayedRemoveFutureOfF() throws Exception {
317         BiConsumer<String, Throwable> remover = controller.delayedRemove(future1);
318
319         remover.accept(TEXT, EXPECTED_EXCEPTION);
320
321         // should not have completed the controller
322         assertFalse(controller.isDone());
323
324         verify(future1, never()).cancel(anyBoolean());
325
326         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
327
328         verify(future1, never()).cancel(anyBoolean());
329         verify(future2).cancel(anyBoolean());
330     }
331
332     @Test
333     public void testDelayedRemoveRunnable() throws Exception {
334         BiConsumer<String, Throwable> remover = controller.delayedRemove(runnable1);
335
336         remover.accept(TEXT, EXPECTED_EXCEPTION);
337
338         // should not have completed the controller
339         assertFalse(controller.isDone());
340
341         verify(runnable1, never()).run();
342
343         controller.delayedComplete().accept(TEXT, EXPECTED_EXCEPTION);
344
345         verify(runnable1, never()).run();
346         verify(runnable2).run();
347     }
348
349     @Test
350     public void testRemoveFutureOfF_testRemoveRunnable() {
351         controller.remove(runnable2);
352         controller.remove(future1);
353
354         controller.cancel(true);
355
356         verify(runnable1).run();
357         verify(runnable2, never()).run();
358         verify(future1, never()).cancel(anyBoolean());
359         verify(future2).cancel(anyBoolean());
360     }
361
362     /**
363      * Tests both wrap() methods.
364      */
365     @Test
366     public void testWrap() throws Exception {
367         controller = spy(controller);
368
369         CompletableFuture<String> future = controller.wrap(compFuture);
370         verify(controller, never()).remove(compFuture);
371
372         compFuture.complete(TEXT);
373         assertEquals(TEXT, future.get());
374
375         verify(controller).remove(compFuture);
376     }
377
378     /**
379      * Tests wrap(), when the controller is not running.
380      */
381     @Test
382     public void testWrapNotRunning() throws Exception {
383         controller.cancel(false);
384         controller = spy(controller);
385
386         assertFalse(controller.wrap(compFuture).isDone());
387         verify(controller, never()).add(compFuture);
388         verify(controller, never()).remove(compFuture);
389
390         verify(compFuture).cancel(anyBoolean());
391     }
392
393     /**
394      * Tests wrap(), when the future throws an exception.
395      */
396     @Test
397     public void testWrapException() throws Exception {
398         controller = spy(controller);
399
400         CompletableFuture<String> future = controller.wrap(compFuture);
401         verify(controller, never()).remove(compFuture);
402
403         compFuture.completeExceptionally(EXPECTED_EXCEPTION);
404         assertThatThrownBy(() -> future.get()).hasCause(EXPECTED_EXCEPTION);
405
406         verify(controller).remove(compFuture);
407     }
408
409     @Test
410     public void testWrapFunction() throws Exception {
411
412         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
413             compFuture.complete(input);
414             return compFuture;
415         });
416
417         CompletableFuture<String> future = func.apply(TEXT);
418         assertTrue(compFuture.isDone());
419
420         assertEquals(TEXT, future.get());
421
422         // should not have completed the controller
423         assertFalse(controller.isDone());
424     }
425
426     /**
427      * Tests wrap(Function) when the controller is canceled after the future is added.
428      */
429     @Test
430     public void testWrapFunctionCancel() throws Exception {
431         Function<String, CompletableFuture<String>> func = controller.wrap(input -> compFuture);
432
433         CompletableFuture<String> future = func.apply(TEXT);
434         assertFalse(future.isDone());
435
436         assertFalse(compFuture.isDone());
437
438         // cancel - should propagate
439         controller.cancel(false);
440
441         verify(compFuture).cancel(anyBoolean());
442     }
443
444     /**
445      * Tests wrap(Function) when the controller is not running.
446      */
447     @Test
448     public void testWrapFunctionNotRunning() {
449         AtomicReference<String> value = new AtomicReference<>();
450
451         Function<String, CompletableFuture<String>> func = controller.wrap(input -> {
452             value.set(input);
453             return compFuture;
454         });
455
456         controller.cancel(false);
457
458         CompletableFuture<String> fut = func.apply(TEXT);
459         assertNotSame(compFuture, fut);
460         assertFalse(fut.isDone());
461
462         assertNull(value.get());
463     }
464
465     private void verifyStopped() {
466         verify(runnable1).run();
467         verify(runnable2).run();
468         verify(future1).cancel(anyBoolean());
469         verify(future2).cancel(anyBoolean());
470     }
471 }