Clean up and enhancement of Actor re-design
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperatorPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
34
35 import java.time.Instant;
36 import java.util.Arrays;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Queue;
42 import java.util.TreeMap;
43 import java.util.UUID;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionException;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import java.util.concurrent.atomic.AtomicInteger;
55 import java.util.concurrent.atomic.AtomicReference;
56 import java.util.function.Consumer;
57 import java.util.function.Function;
58 import java.util.stream.Collectors;
59 import lombok.Getter;
60 import lombok.Setter;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.onap.policy.controlloop.ControlLoopOperation;
64 import org.onap.policy.controlloop.VirtualControlLoopEvent;
65 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
66 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
67 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
68 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
69 import org.onap.policy.controlloop.policy.PolicyResult;
70
71 public class OperatorPartialTest {
72     private static final int MAX_PARALLEL_REQUESTS = 10;
73     private static final String EXPECTED_EXCEPTION = "expected exception";
74     private static final String ACTOR = "my-actor";
75     private static final String OPERATOR = "my-operator";
76     private static final String TARGET = "my-target";
77     private static final int TIMEOUT = 1000;
78     private static final UUID REQ_ID = UUID.randomUUID();
79
80     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
81                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
82
83     private VirtualControlLoopEvent event;
84     private Map<String, Object> config;
85     private ControlLoopEventContext context;
86     private MyExec executor;
87     private ControlLoopOperationParams params;
88
89     private MyOper oper;
90
91     private int numStart;
92     private int numEnd;
93
94     private Instant tstart;
95
96     private OperationOutcome opstart;
97     private OperationOutcome opend;
98
99     /**
100      * Initializes the fields, including {@link #oper}.
101      */
102     @Before
103     public void setUp() {
104         event = new VirtualControlLoopEvent();
105         event.setRequestId(REQ_ID);
106
107         config = new TreeMap<>();
108         context = new ControlLoopEventContext(event);
109         executor = new MyExec();
110
111         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
112                         .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT)
113                         .startCallback(this::starter).targetEntity(TARGET).build();
114
115         oper = new MyOper();
116         oper.configure(new TreeMap<>());
117         oper.start();
118
119         tstart = null;
120
121         opstart = null;
122         opend = null;
123     }
124
125     @Test
126     public void testOperatorPartial_testGetActorName_testGetName() {
127         assertEquals(ACTOR, oper.getActorName());
128         assertEquals(OPERATOR, oper.getName());
129         assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
130     }
131
132     @Test
133     public void testGetBlockingExecutor() throws InterruptedException {
134         CountDownLatch latch = new CountDownLatch(1);
135
136         /*
137          * Use an operator that doesn't override getBlockingExecutor().
138          */
139         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
140         oper2.getBlockingExecutor().execute(() -> latch.countDown());
141
142         assertTrue(latch.await(5, TimeUnit.SECONDS));
143     }
144
145     @Test
146     public void testDoConfigure() {
147         oper = spy(new MyOper());
148
149         oper.configure(config);
150         verify(oper).configure(config);
151
152         // repeat - SHOULD be run again
153         oper.configure(config);
154         verify(oper, times(2)).configure(config);
155     }
156
157     @Test
158     public void testDoStart() {
159         oper = spy(new MyOper());
160
161         oper.configure(config);
162         oper.start();
163
164         verify(oper).doStart();
165
166         // others should not have been invoked
167         verify(oper, never()).doStop();
168         verify(oper, never()).doShutdown();
169     }
170
171     @Test
172     public void testDoStop() {
173         oper = spy(new MyOper());
174
175         oper.configure(config);
176         oper.start();
177         oper.stop();
178
179         verify(oper).doStop();
180
181         // should not have been re-invoked
182         verify(oper).doStart();
183
184         // others should not have been invoked
185         verify(oper, never()).doShutdown();
186     }
187
188     @Test
189     public void testDoShutdown() {
190         oper = spy(new MyOper());
191
192         oper.configure(config);
193         oper.start();
194         oper.shutdown();
195
196         verify(oper).doShutdown();
197
198         // should not have been re-invoked
199         verify(oper).doStart();
200
201         // others should not have been invoked
202         verify(oper, never()).doStop();
203     }
204
205     @Test
206     public void testStartOperation() {
207         verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
208     }
209
210     /**
211      * Tests startOperation() when the operator is not running.
212      */
213     @Test
214     public void testStartOperationNotRunning() {
215         // use a new operator, one that hasn't been started yet
216         oper = new MyOper();
217         oper.configure(new TreeMap<>());
218
219         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
220     }
221
222     /**
223      * Tests startOperation() when the operation has a preprocessor.
224      */
225     @Test
226     public void testStartOperationWithPreprocessor() {
227         AtomicInteger count = new AtomicInteger();
228
229         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
230             count.incrementAndGet();
231             return makeSuccess();
232         }, executor);
233
234         oper.setPreProcessor(preproc);
235
236         verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
237
238         assertEquals(1, count.get());
239     }
240
241     /**
242      * Tests startOperation() with multiple running requests.
243      */
244     @Test
245     public void testStartOperationMultiple() {
246         for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
247             oper.startOperation(params);
248         }
249
250         assertTrue(executor.runAll());
251
252         assertNotNull(opstart);
253         assertNotNull(opend);
254         assertEquals(PolicyResult.SUCCESS, opend.getResult());
255
256         assertEquals(MAX_PARALLEL_REQUESTS, numStart);
257         assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
258         assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
259     }
260
261     /**
262      * Tests startPreprocessor() when the preprocessor returns a failure.
263      */
264     @Test
265     public void testStartPreprocessorFailure() {
266         oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
267
268         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
269     }
270
271     /**
272      * Tests startPreprocessor() when the preprocessor throws an exception.
273      */
274     @Test
275     public void testStartPreprocessorException() {
276         // arrange for the preprocessor to throw an exception
277         oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
278
279         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
280     }
281
282     /**
283      * Tests startPreprocessor() when the pipeline is not running.
284      */
285     @Test
286     public void testStartPreprocessorNotRunning() {
287         // arrange for the preprocessor to return success, which will be ignored
288         oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
289
290         oper.startOperation(params).cancel(false);
291         assertTrue(executor.runAll());
292
293         assertNull(opstart);
294         assertNull(opend);
295
296         assertEquals(0, numStart);
297         assertEquals(0, oper.getCount());
298         assertEquals(0, numEnd);
299     }
300
301     /**
302      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
303      */
304     @Test
305     public void testStartPreprocessorBuilderException() {
306         oper = new MyOper() {
307             @Override
308             protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
309                 throw new IllegalStateException(EXPECTED_EXCEPTION);
310             }
311         };
312
313         oper.configure(new TreeMap<>());
314         oper.start();
315
316         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
317
318         // should be nothing in the queue
319         assertEquals(0, executor.getQueueLength());
320     }
321
322     @Test
323     public void testStartPreprocessorAsync() {
324         assertNull(oper.startPreprocessorAsync(params));
325     }
326
327     @Test
328     public void testStartOperationAsync() {
329         oper.startOperation(params);
330         assertTrue(executor.runAll());
331
332         assertEquals(1, oper.getCount());
333     }
334
335     @Test
336     public void testIsSuccess() {
337         OperationOutcome outcome = new OperationOutcome();
338
339         outcome.setResult(PolicyResult.SUCCESS);
340         assertTrue(oper.isSuccess(outcome));
341
342         for (PolicyResult failure : FAILURE_RESULTS) {
343             outcome.setResult(failure);
344             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
345         }
346     }
347
348     @Test
349     public void testIsActorFailed() {
350         assertFalse(oper.isActorFailed(null));
351
352         OperationOutcome outcome = params.makeOutcome();
353
354         // incorrect outcome
355         outcome.setResult(PolicyResult.SUCCESS);
356         assertFalse(oper.isActorFailed(outcome));
357
358         outcome.setResult(PolicyResult.FAILURE_RETRIES);
359         assertFalse(oper.isActorFailed(outcome));
360
361         // correct outcome
362         outcome.setResult(PolicyResult.FAILURE);
363
364         // incorrect actor
365         outcome.setActor(TARGET);
366         assertFalse(oper.isActorFailed(outcome));
367         outcome.setActor(null);
368         assertFalse(oper.isActorFailed(outcome));
369         outcome.setActor(ACTOR);
370
371         // incorrect operation
372         outcome.setOperation(TARGET);
373         assertFalse(oper.isActorFailed(outcome));
374         outcome.setOperation(null);
375         assertFalse(oper.isActorFailed(outcome));
376         outcome.setOperation(OPERATOR);
377
378         // correct values
379         assertTrue(oper.isActorFailed(outcome));
380     }
381
382     @Test
383     public void testDoOperation() {
384         /*
385          * Use an operator that doesn't override doOperation().
386          */
387         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {
388             @Override
389             protected Executor getBlockingExecutor() {
390                 return executor;
391             }
392         };
393
394         oper2.configure(new TreeMap<>());
395         oper2.start();
396
397         oper2.startOperation(params);
398         assertTrue(executor.runAll());
399
400         assertNotNull(opend);
401         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
402     }
403
404     @Test
405     public void testTimeout() throws Exception {
406
407         // use a real executor
408         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
409
410         // trigger timeout very quickly
411         oper = new MyOper() {
412             @Override
413             protected long getTimeOutMillis(Integer timeoutSec) {
414                 return 1;
415             }
416
417             @Override
418             protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params,
419                             int attempt, OperationOutcome outcome) {
420
421                 OperationOutcome outcome2 = params.makeOutcome();
422                 outcome2.setResult(PolicyResult.SUCCESS);
423
424                 /*
425                  * Create an incomplete future that will timeout after the operation's
426                  * timeout. If it fires before the other timer, then it will return a
427                  * SUCCESS outcome.
428                  */
429                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
430                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
431                                 params.getExecutor());
432
433                 return future;
434             }
435         };
436
437         oper.configure(new TreeMap<>());
438         oper.start();
439
440         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult());
441     }
442
443     /**
444      * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
445      * operation once the preprocessor completes.
446      */
447     @Test
448     public void testTimeoutInPreprocessor() throws Exception {
449
450         // use a real executor
451         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
452
453         // trigger timeout very quickly
454         oper = new MyOper() {
455             @Override
456             protected long getTimeOutMillis(Integer timeoutSec) {
457                 return 10;
458             }
459
460             @Override
461             protected Executor getBlockingExecutor() {
462                 return command -> {
463                     Thread thread = new Thread(command);
464                     thread.start();
465                 };
466             }
467
468             @Override
469             protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
470
471                 OperationOutcome outcome = makeSuccess();
472
473                 /*
474                  * Create an incomplete future that will timeout after the operation's
475                  * timeout. If it fires before the other timer, then it will return a
476                  * SUCCESS outcome.
477                  */
478                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
479                 future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
480                                 params.getExecutor());
481
482                 return future;
483             }
484         };
485
486         oper.configure(new TreeMap<>());
487         oper.start();
488
489         OperationOutcome result = oper.startOperation(params).get();
490         assertEquals(PolicyResult.SUCCESS, result.getResult());
491
492         assertNotNull(opstart);
493         assertNotNull(opend);
494         assertEquals(PolicyResult.SUCCESS, opend.getResult());
495
496         assertEquals(1, numStart);
497         assertEquals(1, oper.getCount());
498         assertEquals(1, numEnd);
499     }
500
501     /**
502      * Tests retry functions, when the count is set to zero and retries are exhausted.
503      */
504     @Test
505     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
506         params = params.toBuilder().retry(0).build();
507         oper.setMaxFailures(10);
508
509         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
510     }
511
512     /**
513      * Tests retry functions, when the count is null and retries are exhausted.
514      */
515     @Test
516     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
517         params = params.toBuilder().retry(null).build();
518         oper.setMaxFailures(10);
519
520         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
521     }
522
523     /**
524      * Tests retry functions, when retries are exhausted.
525      */
526     @Test
527     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
528         final int maxRetries = 3;
529         params = params.toBuilder().retry(maxRetries).build();
530         oper.setMaxFailures(10);
531
532         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
533                         PolicyResult.FAILURE_RETRIES);
534     }
535
536     /**
537      * Tests retry functions, when a success follows some retries.
538      */
539     @Test
540     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
541         params = params.toBuilder().retry(10).build();
542
543         final int maxFailures = 3;
544         oper.setMaxFailures(maxFailures);
545
546         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
547                         PolicyResult.SUCCESS);
548     }
549
550     /**
551      * Tests retry functions, when the outcome is {@code null}.
552      */
553     @Test
554     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
555
556         // arrange to return null from doOperation()
557         oper = new MyOper() {
558             @Override
559             protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
560                             OperationOutcome operation) {
561
562                 // update counters
563                 super.doOperation(params, attempt, operation);
564                 return null;
565             }
566         };
567
568         oper.configure(new TreeMap<>());
569         oper.start();
570
571         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
572     }
573
574     @Test
575     public void testIsSameOperation() {
576         assertFalse(oper.isSameOperation(null));
577
578         OperationOutcome outcome = params.makeOutcome();
579
580         // wrong actor - should be false
581         outcome.setActor(null);
582         assertFalse(oper.isSameOperation(outcome));
583         outcome.setActor(TARGET);
584         assertFalse(oper.isSameOperation(outcome));
585         outcome.setActor(ACTOR);
586
587         // wrong operation - should be null
588         outcome.setOperation(null);
589         assertFalse(oper.isSameOperation(outcome));
590         outcome.setOperation(TARGET);
591         assertFalse(oper.isSameOperation(outcome));
592         outcome.setOperation(OPERATOR);
593
594         assertTrue(oper.isSameOperation(outcome));
595     }
596
597     /**
598      * Tests handleFailure() when the outcome is a success.
599      */
600     @Test
601     public void testHandlePreprocessorFailureTrue() {
602         oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
603         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
604     }
605
606     /**
607      * Tests handleFailure() when the outcome is <i>not</i> a success.
608      */
609     @Test
610     public void testHandlePreprocessorFailureFalse() throws Exception {
611         oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
612         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
613     }
614
615     /**
616      * Tests handleFailure() when the outcome is {@code null}.
617      */
618     @Test
619     public void testHandlePreprocessorFailureNull() throws Exception {
620         // arrange to return null from the preprocessor
621         oper.setPreProcessor(CompletableFuture.completedFuture(null));
622
623         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
624     }
625
626     @Test
627     public void testFromException() {
628         // arrange to generate an exception when operation runs
629         oper.setGenException(true);
630
631         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
632     }
633
634     /**
635      * Tests fromException() when there is no exception.
636      */
637     @Test
638     public void testFromExceptionNoExcept() {
639         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
640     }
641
642     /**
643      * Tests both flavors of anyOf(), because one invokes the other.
644      */
645     @Test
646     public void testAnyOf() throws Exception {
647         // first task completes, others do not
648         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
649
650         final OperationOutcome outcome = params.makeOutcome();
651
652         tasks.add(CompletableFuture.completedFuture(outcome));
653         tasks.add(new CompletableFuture<>());
654         tasks.add(new CompletableFuture<>());
655
656         CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks);
657         assertTrue(executor.runAll());
658
659         assertTrue(result.isDone());
660         assertSame(outcome, result.get());
661
662         // second task completes, others do not
663         tasks = new LinkedList<>();
664
665         tasks.add(new CompletableFuture<>());
666         tasks.add(CompletableFuture.completedFuture(outcome));
667         tasks.add(new CompletableFuture<>());
668
669         result = oper.anyOf(params, tasks);
670         assertTrue(executor.runAll());
671
672         assertTrue(result.isDone());
673         assertSame(outcome, result.get());
674
675         // third task completes, others do not
676         tasks = new LinkedList<>();
677
678         tasks.add(new CompletableFuture<>());
679         tasks.add(new CompletableFuture<>());
680         tasks.add(CompletableFuture.completedFuture(outcome));
681
682         result = oper.anyOf(params, tasks);
683         assertTrue(executor.runAll());
684
685         assertTrue(result.isDone());
686         assertSame(outcome, result.get());
687     }
688
689     /**
690      * Tests both flavors of allOf(), because one invokes the other.
691      */
692     @Test
693     public void testAllOf() throws Exception {
694         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
695
696         final OperationOutcome outcome = params.makeOutcome();
697
698         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
699         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
700         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
701
702         tasks.add(future1);
703         tasks.add(future2);
704         tasks.add(future3);
705
706         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
707
708         assertTrue(executor.runAll());
709         assertFalse(result.isDone());
710         future1.complete(outcome);
711
712         // complete 3 before 2
713         assertTrue(executor.runAll());
714         assertFalse(result.isDone());
715         future3.complete(outcome);
716
717         assertTrue(executor.runAll());
718         assertFalse(result.isDone());
719         future2.complete(outcome);
720
721         // all of them are now done
722         assertTrue(executor.runAll());
723         assertTrue(result.isDone());
724         assertSame(outcome, result.get());
725     }
726
727     @Test
728     public void testCombineOutcomes() throws Exception {
729         // only one outcome
730         verifyOutcomes(0, PolicyResult.SUCCESS);
731         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
732
733         // maximum is in different positions
734         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
735         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
736         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
737
738         // null outcome
739         final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
740         tasks.add(CompletableFuture.completedFuture(null));
741         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
742
743         assertTrue(executor.runAll());
744         assertTrue(result.isDone());
745         assertNull(result.get());
746
747         // one throws an exception during execution
748         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
749
750         tasks.clear();
751         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
752         tasks.add(CompletableFuture.failedFuture(except));
753         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
754         result = oper.allOf(params, tasks);
755
756         assertTrue(executor.runAll());
757         assertTrue(result.isCompletedExceptionally());
758         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
759     }
760
761     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
762         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
763
764
765         OperationOutcome expectedOutcome = null;
766
767         for (int count = 0; count < results.length; ++count) {
768             OperationOutcome outcome = params.makeOutcome();
769             outcome.setResult(results[count]);
770             tasks.add(CompletableFuture.completedFuture(outcome));
771
772             if (count == expected) {
773                 expectedOutcome = outcome;
774             }
775         }
776
777         CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
778
779         assertTrue(executor.runAll());
780         assertTrue(result.isDone());
781         assertSame(expectedOutcome, result.get());
782     }
783
784     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
785                     final OperationOutcome taskOutcome) {
786
787         return outcome -> CompletableFuture.completedFuture(taskOutcome);
788     }
789
790     @Test
791     public void testDetmPriority() {
792         assertEquals(1, oper.detmPriority(null));
793
794         OperationOutcome outcome = params.makeOutcome();
795
796         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
797                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
798                         PolicyResult.FAILURE_EXCEPTION, 6);
799
800         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
801             outcome.setResult(ent.getKey());
802             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
803         }
804     }
805
806     /**
807      * Tests doTask(Future) when the controller is not running.
808      */
809     @Test
810     public void testDoTaskFutureNotRunning() throws Exception {
811         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
812
813         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
814         controller.complete(params.makeOutcome());
815
816         CompletableFuture<OperationOutcome> future =
817                         oper.doTask(params, controller, false, params.makeOutcome(), taskFuture);
818         assertFalse(future.isDone());
819         assertTrue(executor.runAll());
820
821         // should not have run the task
822         assertFalse(future.isDone());
823
824         // should have canceled the task future
825         assertTrue(taskFuture.isCancelled());
826     }
827
828     /**
829      * Tests doTask(Future) when the previous outcome was successful.
830      */
831     @Test
832     public void testDoTaskFutureSuccess() throws Exception {
833         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
834         final OperationOutcome taskOutcome = params.makeOutcome();
835
836         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
837
838         CompletableFuture<OperationOutcome> future =
839                         oper.doTask(params, controller, true, params.makeOutcome(), taskFuture);
840
841         taskFuture.complete(taskOutcome);
842         assertTrue(executor.runAll());
843
844         assertTrue(future.isDone());
845         assertSame(taskOutcome, future.get());
846
847         // controller should not be done yet
848         assertFalse(controller.isDone());
849     }
850
851     /**
852      * Tests doTask(Future) when the previous outcome was failed.
853      */
854     @Test
855     public void testDoTaskFutureFailure() throws Exception {
856         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
857         final OperationOutcome failedOutcome = params.makeOutcome();
858         failedOutcome.setResult(PolicyResult.FAILURE);
859
860         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
861
862         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture);
863         assertFalse(future.isDone());
864         assertTrue(executor.runAll());
865
866         // should not have run the task
867         assertFalse(future.isDone());
868
869         // should have canceled the task future
870         assertTrue(taskFuture.isCancelled());
871
872         // controller SHOULD be done now
873         assertTrue(controller.isDone());
874         assertSame(failedOutcome, controller.get());
875     }
876
877     /**
878      * Tests doTask(Future) when the previous outcome was failed, but not checking
879      * success.
880      */
881     @Test
882     public void testDoTaskFutureUncheckedFailure() throws Exception {
883         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
884         final OperationOutcome failedOutcome = params.makeOutcome();
885         failedOutcome.setResult(PolicyResult.FAILURE);
886
887         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
888
889         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture);
890         assertFalse(future.isDone());
891
892         // complete the task
893         OperationOutcome taskOutcome = params.makeOutcome();
894         taskFuture.complete(taskOutcome);
895
896         assertTrue(executor.runAll());
897
898         // should have run the task
899         assertTrue(future.isDone());
900
901         assertTrue(future.isDone());
902         assertSame(taskOutcome, future.get());
903
904         // controller should not be done yet
905         assertFalse(controller.isDone());
906     }
907
908     /**
909      * Tests doTask(Function) when the controller is not running.
910      */
911     @Test
912     public void testDoTaskFunctionNotRunning() throws Exception {
913         AtomicBoolean invoked = new AtomicBoolean();
914
915         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
916             invoked.set(true);
917             return CompletableFuture.completedFuture(params.makeOutcome());
918         };
919
920         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
921         controller.complete(params.makeOutcome());
922
923         CompletableFuture<OperationOutcome> future =
924                         oper.doTask(params, controller, false, task).apply(params.makeOutcome());
925         assertFalse(future.isDone());
926         assertTrue(executor.runAll());
927
928         // should not have run the task
929         assertFalse(future.isDone());
930
931         // should not have even invoked the task
932         assertFalse(invoked.get());
933     }
934
935     /**
936      * Tests doTask(Function) when the previous outcome was successful.
937      */
938     @Test
939     public void testDoTaskFunctionSuccess() throws Exception {
940         final OperationOutcome taskOutcome = params.makeOutcome();
941
942         final OperationOutcome failedOutcome = params.makeOutcome();
943
944         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
945
946         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
947
948         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
949
950         assertTrue(future.isDone());
951         assertSame(taskOutcome, future.get());
952
953         // controller should not be done yet
954         assertFalse(controller.isDone());
955     }
956
957     /**
958      * Tests doTask(Function) when the previous outcome was failed.
959      */
960     @Test
961     public void testDoTaskFunctionFailure() throws Exception {
962         final OperationOutcome failedOutcome = params.makeOutcome();
963         failedOutcome.setResult(PolicyResult.FAILURE);
964
965         AtomicBoolean invoked = new AtomicBoolean();
966
967         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
968             invoked.set(true);
969             return CompletableFuture.completedFuture(params.makeOutcome());
970         };
971
972         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
973
974         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
975         assertFalse(future.isDone());
976         assertTrue(executor.runAll());
977
978         // should not have run the task
979         assertFalse(future.isDone());
980
981         // should not have even invoked the task
982         assertFalse(invoked.get());
983
984         // controller should have the failed task
985         assertTrue(controller.isDone());
986         assertSame(failedOutcome, controller.get());
987     }
988
989     /**
990      * Tests doTask(Function) when the previous outcome was failed, but not checking
991      * success.
992      */
993     @Test
994     public void testDoTaskFunctionUncheckedFailure() throws Exception {
995         final OperationOutcome taskOutcome = params.makeOutcome();
996
997         final OperationOutcome failedOutcome = params.makeOutcome();
998         failedOutcome.setResult(PolicyResult.FAILURE);
999
1000         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
1001
1002         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
1003
1004         CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome);
1005
1006         assertTrue(future.isDone());
1007         assertSame(taskOutcome, future.get());
1008
1009         // controller should not be done yet
1010         assertFalse(controller.isDone());
1011     }
1012
1013     /**
1014      * Tests callbackStarted() when the pipeline has already been stopped.
1015      */
1016     @Test
1017     public void testCallbackStartedNotRunning() {
1018         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1019
1020         /*
1021          * arrange to stop the controller when the start-callback is invoked, but capture
1022          * the outcome
1023          */
1024         params = params.toBuilder().startCallback(oper -> {
1025             starter(oper);
1026             future.get().cancel(false);
1027         }).build();
1028
1029         future.set(oper.startOperation(params));
1030         assertTrue(executor.runAll());
1031
1032         // should have only run once
1033         assertEquals(1, numStart);
1034     }
1035
1036     /**
1037      * Tests callbackCompleted() when the pipeline has already been stopped.
1038      */
1039     @Test
1040     public void testCallbackCompletedNotRunning() {
1041         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1042
1043         // arrange to stop the controller when the start-callback is invoked
1044         params = params.toBuilder().startCallback(oper -> {
1045             future.get().cancel(false);
1046         }).build();
1047
1048         future.set(oper.startOperation(params));
1049         assertTrue(executor.runAll());
1050
1051         // should not have been set
1052         assertNull(opend);
1053         assertEquals(0, numEnd);
1054     }
1055
1056     @Test
1057     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1058         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1059
1060         OperationOutcome outcome;
1061
1062         outcome = new OperationOutcome();
1063         oper.setOutcome(params, outcome, timex);
1064         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1065         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1066
1067         outcome = new OperationOutcome();
1068         oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1069         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1070         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1071     }
1072
1073     @Test
1074     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1075         OperationOutcome outcome;
1076
1077         outcome = new OperationOutcome();
1078         oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
1079         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1080         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1081
1082         for (PolicyResult result : FAILURE_RESULTS) {
1083             outcome = new OperationOutcome();
1084             oper.setOutcome(params, outcome, result);
1085             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1086             assertEquals(result.toString(), result, outcome.getResult());
1087         }
1088     }
1089
1090     @Test
1091     public void testIsTimeout() {
1092         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1093
1094         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1095         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1096         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1097         assertFalse(oper.isTimeout(new CompletionException(null)));
1098         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1099
1100         assertTrue(oper.isTimeout(timex));
1101         assertTrue(oper.isTimeout(new CompletionException(timex)));
1102     }
1103
1104     @Test
1105     public void testGetTimeOutMillis() {
1106         assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec()));
1107
1108         params = params.toBuilder().timeoutSec(null).build();
1109         assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec()));
1110     }
1111
1112     private void starter(OperationOutcome oper) {
1113         ++numStart;
1114         tstart = oper.getStart();
1115         opstart = oper;
1116     }
1117
1118     private void completer(OperationOutcome oper) {
1119         ++numEnd;
1120         opend = oper;
1121     }
1122
1123     /**
1124      * Gets a function that does nothing.
1125      *
1126      * @param <T> type of input parameter expected by the function
1127      * @return a function that does nothing
1128      */
1129     private <T> Consumer<T> noop() {
1130         return unused -> {
1131         };
1132     }
1133
1134     private OperationOutcome makeSuccess() {
1135         OperationOutcome outcome = params.makeOutcome();
1136         outcome.setResult(PolicyResult.SUCCESS);
1137
1138         return outcome;
1139     }
1140
1141     private OperationOutcome makeFailure() {
1142         OperationOutcome outcome = params.makeOutcome();
1143         outcome.setResult(PolicyResult.FAILURE);
1144
1145         return outcome;
1146     }
1147
1148     /**
1149      * Verifies a run.
1150      *
1151      * @param testName test name
1152      * @param expectedCallbacks number of callbacks expected
1153      * @param expectedOperations number of operation invocations expected
1154      * @param expectedResult expected outcome
1155      */
1156     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1157                     PolicyResult expectedResult) {
1158
1159         String expectedSubRequestId =
1160                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1161
1162         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1163     }
1164
1165     /**
1166      * Verifies a run.
1167      *
1168      * @param testName test name
1169      * @param expectedCallbacks number of callbacks expected
1170      * @param expectedOperations number of operation invocations expected
1171      * @param expectedResult expected outcome
1172      * @param expectedSubRequestId expected sub request ID
1173      * @param manipulator function to modify the future returned by
1174      *        {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
1175      *        the tasks in the executor are run
1176      */
1177     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1178                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1179
1180         CompletableFuture<OperationOutcome> future = oper.startOperation(params);
1181
1182         manipulator.accept(future);
1183
1184         assertTrue(testName, executor.runAll());
1185
1186         assertEquals(testName, expectedCallbacks, numStart);
1187         assertEquals(testName, expectedCallbacks, numEnd);
1188
1189         if (expectedCallbacks > 0) {
1190             assertNotNull(testName, opstart);
1191             assertNotNull(testName, opend);
1192             assertEquals(testName, expectedResult, opend.getResult());
1193
1194             assertSame(testName, tstart, opstart.getStart());
1195             assertSame(testName, tstart, opend.getStart());
1196
1197             try {
1198                 assertTrue(future.isDone());
1199                 assertSame(testName, opend, future.get());
1200
1201             } catch (InterruptedException | ExecutionException e) {
1202                 throw new IllegalStateException(e);
1203             }
1204
1205             if (expectedOperations > 0) {
1206                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1207             }
1208         }
1209
1210         assertEquals(testName, expectedOperations, oper.getCount());
1211     }
1212
1213     private class MyOper extends OperatorPartial {
1214         @Getter
1215         private int count = 0;
1216
1217         @Setter
1218         private boolean genException;
1219
1220         @Setter
1221         private int maxFailures = 0;
1222
1223         @Setter
1224         private CompletableFuture<OperationOutcome> preProcessor;
1225
1226         public MyOper() {
1227             super(ACTOR, OPERATOR);
1228         }
1229
1230         @Override
1231         protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
1232                         OperationOutcome operation) {
1233             ++count;
1234             if (genException) {
1235                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1236             }
1237
1238             operation.setSubRequestId(String.valueOf(attempt));
1239
1240             if (count > maxFailures) {
1241                 operation.setResult(PolicyResult.SUCCESS);
1242             } else {
1243                 operation.setResult(PolicyResult.FAILURE);
1244             }
1245
1246             return operation;
1247         }
1248
1249         @Override
1250         protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
1251             return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params));
1252         }
1253
1254         @Override
1255         protected Executor getBlockingExecutor() {
1256             return executor;
1257         }
1258     }
1259
1260     /**
1261      * Executor that will run tasks until the queue is empty or a maximum number of tasks
1262      * have been executed.
1263      */
1264     private static class MyExec implements Executor {
1265         private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
1266
1267         private Queue<Runnable> commands = new LinkedList<>();
1268
1269         public MyExec() {
1270             // do nothing
1271         }
1272
1273         public int getQueueLength() {
1274             return commands.size();
1275         }
1276
1277         @Override
1278         public void execute(Runnable command) {
1279             commands.add(command);
1280         }
1281
1282         public boolean runAll() {
1283             for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
1284                 commands.remove().run();
1285             }
1286
1287             return commands.isEmpty();
1288         }
1289     }
1290 }