f28c1f6c6fe4de46031215414d16bcdef0b3d2b1
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.UUID;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.CompletionException;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.Executor;
44 import java.util.concurrent.ForkJoinPool;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Function;
53 import java.util.stream.Collectors;
54 import lombok.Getter;
55 import lombok.Setter;
56 import org.junit.Before;
57 import org.junit.Test;
58 import org.onap.policy.common.utils.coder.CoderException;
59 import org.onap.policy.common.utils.coder.StandardCoder;
60 import org.onap.policy.controlloop.ControlLoopOperation;
61 import org.onap.policy.controlloop.VirtualControlLoopEvent;
62 import org.onap.policy.controlloop.actorserviceprovider.Operation;
63 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
64 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
65 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
66 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
67 import org.onap.policy.controlloop.policy.PolicyResult;
68
69 public class OperationPartialTest {
70     private static final int MAX_PARALLEL_REQUESTS = 10;
71     private static final String EXPECTED_EXCEPTION = "expected exception";
72     private static final String ACTOR = "my-actor";
73     private static final String OPERATION = "my-operation";
74     private static final String TARGET = "my-target";
75     private static final int TIMEOUT = 1000;
76     private static final UUID REQ_ID = UUID.randomUUID();
77
78     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
79                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
80
81     private VirtualControlLoopEvent event;
82     private ControlLoopEventContext context;
83     private MyExec executor;
84     private ControlLoopOperationParams params;
85
86     private MyOper oper;
87
88     private int numStart;
89     private int numEnd;
90
91     private Instant tstart;
92
93     private OperationOutcome opstart;
94     private OperationOutcome opend;
95
96     private OperatorPartial operator;
97
98     /**
99      * Initializes the fields, including {@link #oper}.
100      */
101     @Before
102     public void setUp() {
103         event = new VirtualControlLoopEvent();
104         event.setRequestId(REQ_ID);
105
106         context = new ControlLoopEventContext(event);
107         executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
108
109         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
110                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
111                         .startCallback(this::starter).targetEntity(TARGET).build();
112
113         operator = new OperatorPartial(ACTOR, OPERATION) {
114             @Override
115             public Executor getBlockingExecutor() {
116                 return executor;
117             }
118
119             @Override
120             public Operation buildOperation(ControlLoopOperationParams params) {
121                 return null;
122             }
123         };
124
125         operator.configure(null);
126         operator.start();
127
128         oper = new MyOper();
129
130         tstart = null;
131
132         opstart = null;
133         opend = null;
134     }
135
136     @Test
137     public void testOperatorPartial_testGetActorName_testGetName() {
138         assertEquals(ACTOR, oper.getActorName());
139         assertEquals(OPERATION, oper.getName());
140         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
141     }
142
143     @Test
144     public void testGetBlockingThread() throws Exception {
145         CompletableFuture<Void> future = new CompletableFuture<>();
146
147         // use the real executor
148         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
149             @Override
150             public Operation buildOperation(ControlLoopOperationParams params) {
151                 return null;
152             }
153         };
154
155         oper2.getBlockingExecutor().execute(() -> future.complete(null));
156
157         assertNull(future.get(5, TimeUnit.SECONDS));
158     }
159
160     /**
161      * Exercises the doXxx() methods.
162      */
163     @Test
164     public void testDoXxx() {
165         assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
166         assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
167         assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
168         assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
169
170     }
171
172     @Test
173     public void testStart() {
174         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
175     }
176
177     /**
178      * Tests startOperation() when the operator is not running.
179      */
180     @Test
181     public void testStartNotRunning() {
182         // stop the operator
183         operator.stop();
184
185         assertThatIllegalStateException().isThrownBy(() -> oper.start());
186     }
187
188     /**
189      * Tests startOperation() when the operation has a preprocessor.
190      */
191     @Test
192     public void testStartWithPreprocessor() {
193         AtomicInteger count = new AtomicInteger();
194
195         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
196             count.incrementAndGet();
197             return makeSuccess();
198         }, executor);
199
200         oper.setGuard(preproc);
201
202         verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
203
204         assertEquals(1, count.get());
205     }
206
207     /**
208      * Tests start() with multiple running requests.
209      */
210     @Test
211     public void testStartMultiple() {
212         for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
213             oper.start();
214         }
215
216         assertTrue(executor.runAll());
217
218         assertNotNull(opstart);
219         assertNotNull(opend);
220         assertEquals(PolicyResult.SUCCESS, opend.getResult());
221
222         assertEquals(MAX_PARALLEL_REQUESTS, numStart);
223         assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
224         assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
225     }
226
227     /**
228      * Tests startPreprocessor() when the preprocessor returns a failure.
229      */
230     @Test
231     public void testStartPreprocessorFailure() {
232         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
233
234         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
235     }
236
237     /**
238      * Tests startPreprocessor() when the preprocessor throws an exception.
239      */
240     @Test
241     public void testStartPreprocessorException() {
242         // arrange for the preprocessor to throw an exception
243         oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
244
245         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
246     }
247
248     /**
249      * Tests startPreprocessor() when the pipeline is not running.
250      */
251     @Test
252     public void testStartPreprocessorNotRunning() {
253         // arrange for the preprocessor to return success, which will be ignored
254         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
255
256         oper.start().cancel(false);
257         assertTrue(executor.runAll());
258
259         assertNull(opstart);
260         assertNull(opend);
261
262         assertEquals(0, numStart);
263         assertEquals(0, oper.getCount());
264         assertEquals(0, numEnd);
265     }
266
267     /**
268      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
269      */
270     @Test
271     public void testStartPreprocessorBuilderException() {
272         oper = new MyOper() {
273             @Override
274             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
275                 throw new IllegalStateException(EXPECTED_EXCEPTION);
276             }
277         };
278
279         assertThatIllegalStateException().isThrownBy(() -> oper.start());
280
281         // should be nothing in the queue
282         assertEquals(0, executor.getQueueLength());
283     }
284
285     @Test
286     public void testStartPreprocessorAsync() {
287         assertNull(oper.startPreprocessorAsync());
288     }
289
290     @Test
291     public void testStartGuardAsync() {
292         assertNull(oper.startGuardAsync());
293     }
294
295     @Test
296     public void testStartOperationAsync() {
297         oper.start();
298         assertTrue(executor.runAll());
299
300         assertEquals(1, oper.getCount());
301     }
302
303     @Test
304     public void testIsSuccess() {
305         OperationOutcome outcome = new OperationOutcome();
306
307         outcome.setResult(PolicyResult.SUCCESS);
308         assertTrue(oper.isSuccess(outcome));
309
310         for (PolicyResult failure : FAILURE_RESULTS) {
311             outcome.setResult(failure);
312             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
313         }
314     }
315
316     @Test
317     public void testIsActorFailed() {
318         assertFalse(oper.isActorFailed(null));
319
320         OperationOutcome outcome = params.makeOutcome();
321
322         // incorrect outcome
323         outcome.setResult(PolicyResult.SUCCESS);
324         assertFalse(oper.isActorFailed(outcome));
325
326         outcome.setResult(PolicyResult.FAILURE_RETRIES);
327         assertFalse(oper.isActorFailed(outcome));
328
329         // correct outcome
330         outcome.setResult(PolicyResult.FAILURE);
331
332         // incorrect actor
333         outcome.setActor(TARGET);
334         assertFalse(oper.isActorFailed(outcome));
335         outcome.setActor(null);
336         assertFalse(oper.isActorFailed(outcome));
337         outcome.setActor(ACTOR);
338
339         // incorrect operation
340         outcome.setOperation(TARGET);
341         assertFalse(oper.isActorFailed(outcome));
342         outcome.setOperation(null);
343         assertFalse(oper.isActorFailed(outcome));
344         outcome.setOperation(OPERATION);
345
346         // correct values
347         assertTrue(oper.isActorFailed(outcome));
348     }
349
350     @Test
351     public void testDoOperation() {
352         /*
353          * Use an operation that doesn't override doOperation().
354          */
355         OperationPartial oper2 = new OperationPartial(params, operator) {};
356
357         oper2.start();
358         assertTrue(executor.runAll());
359
360         assertNotNull(opend);
361         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
362     }
363
364     @Test
365     public void testTimeout() throws Exception {
366
367         // use a real executor
368         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
369
370         // trigger timeout very quickly
371         oper = new MyOper() {
372             @Override
373             protected long getTimeoutMs(Integer timeoutSec) {
374                 return 1;
375             }
376
377             @Override
378             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
379
380                 OperationOutcome outcome2 = params.makeOutcome();
381                 outcome2.setResult(PolicyResult.SUCCESS);
382
383                 /*
384                  * Create an incomplete future that will timeout after the operation's
385                  * timeout. If it fires before the other timer, then it will return a
386                  * SUCCESS outcome.
387                  */
388                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
389                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
390                                 params.getExecutor());
391
392                 return future;
393             }
394         };
395
396         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
397     }
398
399     /**
400      * Tests retry functions, when the count is set to zero and retries are exhausted.
401      */
402     @Test
403     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
404         params = params.toBuilder().retry(0).build();
405
406         // new params, thus need a new operation
407         oper = new MyOper();
408
409         oper.setMaxFailures(10);
410
411         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
412     }
413
414     /**
415      * Tests retry functions, when the count is null and retries are exhausted.
416      */
417     @Test
418     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
419         params = params.toBuilder().retry(null).build();
420
421         // new params, thus need a new operation
422         oper = new MyOper();
423
424         oper.setMaxFailures(10);
425
426         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
427     }
428
429     /**
430      * Tests retry functions, when retries are exhausted.
431      */
432     @Test
433     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
434         final int maxRetries = 3;
435         params = params.toBuilder().retry(maxRetries).build();
436
437         // new params, thus need a new operation
438         oper = new MyOper();
439
440         oper.setMaxFailures(10);
441
442         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
443                         PolicyResult.FAILURE_RETRIES);
444     }
445
446     /**
447      * Tests retry functions, when a success follows some retries.
448      */
449     @Test
450     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
451         params = params.toBuilder().retry(10).build();
452
453         // new params, thus need a new operation
454         oper = new MyOper();
455
456         final int maxFailures = 3;
457         oper.setMaxFailures(maxFailures);
458
459         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
460                         PolicyResult.SUCCESS);
461     }
462
463     /**
464      * Tests retry functions, when the outcome is {@code null}.
465      */
466     @Test
467     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
468
469         // arrange to return null from doOperation()
470         oper = new MyOper() {
471             @Override
472             protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
473
474                 // update counters
475                 super.doOperation(attempt, operation);
476                 return null;
477             }
478         };
479
480         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
481     }
482
483     @Test
484     public void testSleep() throws Exception {
485         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
486         assertTrue(future.isDone());
487         assertNull(future.get());
488
489         // edge case
490         future = oper.sleep(0, TimeUnit.SECONDS);
491         assertTrue(future.isDone());
492         assertNull(future.get());
493
494         /*
495          * Start a second sleep we can use to check the first while it's running.
496          */
497         tstart = Instant.now();
498         future = oper.sleep(100, TimeUnit.MILLISECONDS);
499
500         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
501
502         // wait for second to complete and verify that the first has not completed
503         future2.get();
504         assertFalse(future.isDone());
505
506         // wait for second to complete
507         future.get();
508
509         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
510         assertTrue(diff >= 99);
511     }
512
513     @Test
514     public void testIsSameOperation() {
515         assertFalse(oper.isSameOperation(null));
516
517         OperationOutcome outcome = params.makeOutcome();
518
519         // wrong actor - should be false
520         outcome.setActor(null);
521         assertFalse(oper.isSameOperation(outcome));
522         outcome.setActor(TARGET);
523         assertFalse(oper.isSameOperation(outcome));
524         outcome.setActor(ACTOR);
525
526         // wrong operation - should be null
527         outcome.setOperation(null);
528         assertFalse(oper.isSameOperation(outcome));
529         outcome.setOperation(TARGET);
530         assertFalse(oper.isSameOperation(outcome));
531         outcome.setOperation(OPERATION);
532
533         assertTrue(oper.isSameOperation(outcome));
534     }
535
536     /**
537      * Tests handleFailure() when the outcome is a success.
538      */
539     @Test
540     public void testHandlePreprocessorFailureTrue() {
541         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
542         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
543     }
544
545     /**
546      * Tests handleFailure() when the outcome is <i>not</i> a success.
547      */
548     @Test
549     public void testHandlePreprocessorFailureFalse() throws Exception {
550         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
551         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
552     }
553
554     /**
555      * Tests handleFailure() when the outcome is {@code null}.
556      */
557     @Test
558     public void testHandlePreprocessorFailureNull() throws Exception {
559         // arrange to return null from the preprocessor
560         oper.setGuard(CompletableFuture.completedFuture(null));
561
562         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
563     }
564
565     @Test
566     public void testFromException() {
567         // arrange to generate an exception when operation runs
568         oper.setGenException(true);
569
570         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
571     }
572
573     /**
574      * Tests fromException() when there is no exception.
575      */
576     @Test
577     public void testFromExceptionNoExcept() {
578         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
579     }
580
581     /**
582      * Tests both flavors of anyOf(), because one invokes the other.
583      */
584     @Test
585     public void testAnyOf() throws Exception {
586         // first task completes, others do not
587         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
588
589         final OperationOutcome outcome = params.makeOutcome();
590
591         tasks.add(CompletableFuture.completedFuture(outcome));
592         tasks.add(new CompletableFuture<>());
593         tasks.add(new CompletableFuture<>());
594
595         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
596         assertTrue(executor.runAll());
597
598         assertTrue(result.isDone());
599         assertSame(outcome, result.get());
600
601         // second task completes, others do not
602         tasks = new LinkedList<>();
603
604         tasks.add(new CompletableFuture<>());
605         tasks.add(CompletableFuture.completedFuture(outcome));
606         tasks.add(new CompletableFuture<>());
607
608         result = oper.anyOf(tasks);
609         assertTrue(executor.runAll());
610
611         assertTrue(result.isDone());
612         assertSame(outcome, result.get());
613
614         // third task completes, others do not
615         tasks = new LinkedList<>();
616
617         tasks.add(new CompletableFuture<>());
618         tasks.add(new CompletableFuture<>());
619         tasks.add(CompletableFuture.completedFuture(outcome));
620
621         result = oper.anyOf(tasks);
622         assertTrue(executor.runAll());
623
624         assertTrue(result.isDone());
625         assertSame(outcome, result.get());
626     }
627
628     /**
629      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
630      */
631     @Test
632     @SuppressWarnings("unchecked")
633     public void testAnyOfEdge() throws Exception {
634         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
635
636         // zero items: check both using a list and using an array
637         assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
638         assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
639
640         // one item: : check both using a list and using an array
641         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
642         tasks.add(future1);
643
644         assertSame(future1, oper.anyOf(tasks));
645         assertSame(future1, oper.anyOf(future1));
646     }
647
648     /**
649      * Tests both flavors of allOf(), because one invokes the other.
650      */
651     @Test
652     public void testAllOf() throws Exception {
653         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
654
655         final OperationOutcome outcome = params.makeOutcome();
656
657         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
658         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
659         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
660
661         tasks.add(future1);
662         tasks.add(future2);
663         tasks.add(future3);
664
665         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
666
667         assertTrue(executor.runAll());
668         assertFalse(result.isDone());
669         future1.complete(outcome);
670
671         // complete 3 before 2
672         assertTrue(executor.runAll());
673         assertFalse(result.isDone());
674         future3.complete(outcome);
675
676         assertTrue(executor.runAll());
677         assertFalse(result.isDone());
678         future2.complete(outcome);
679
680         // all of them are now done
681         assertTrue(executor.runAll());
682         assertTrue(result.isDone());
683         assertSame(outcome, result.get());
684     }
685
686     /**
687      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
688      */
689     @Test
690     @SuppressWarnings("unchecked")
691     public void testAllOfEdge() throws Exception {
692         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
693
694         // zero items: check both using a list and using an array
695         assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
696         assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
697
698         // one item: : check both using a list and using an array
699         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
700         tasks.add(future1);
701
702         assertSame(future1, oper.allOf(tasks));
703         assertSame(future1, oper.allOf(future1));
704     }
705
706     @Test
707     public void testCombineOutcomes() throws Exception {
708         // only one outcome
709         verifyOutcomes(0, PolicyResult.SUCCESS);
710         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
711
712         // maximum is in different positions
713         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
714         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
715         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
716
717         // null outcome
718         final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
719         tasks.add(CompletableFuture.completedFuture(null));
720         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
721
722         assertTrue(executor.runAll());
723         assertTrue(result.isDone());
724         assertNull(result.get());
725
726         // one throws an exception during execution
727         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
728
729         tasks.clear();
730         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
731         tasks.add(CompletableFuture.failedFuture(except));
732         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
733         result = oper.allOf(tasks);
734
735         assertTrue(executor.runAll());
736         assertTrue(result.isCompletedExceptionally());
737         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
738     }
739
740     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
741         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
742
743
744         OperationOutcome expectedOutcome = null;
745
746         for (int count = 0; count < results.length; ++count) {
747             OperationOutcome outcome = params.makeOutcome();
748             outcome.setResult(results[count]);
749             tasks.add(CompletableFuture.completedFuture(outcome));
750
751             if (count == expected) {
752                 expectedOutcome = outcome;
753             }
754         }
755
756         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
757
758         assertTrue(executor.runAll());
759         assertTrue(result.isDone());
760         assertSame(expectedOutcome, result.get());
761     }
762
763     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
764                     final OperationOutcome taskOutcome) {
765
766         return outcome -> CompletableFuture.completedFuture(taskOutcome);
767     }
768
769     @Test
770     public void testDetmPriority() throws CoderException {
771         assertEquals(1, oper.detmPriority(null));
772
773         OperationOutcome outcome = params.makeOutcome();
774
775         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
776                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
777                         PolicyResult.FAILURE_EXCEPTION, 6);
778
779         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
780             outcome.setResult(ent.getKey());
781             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
782         }
783
784         /*
785          * Test null result. We can't actually set it to null, because the set() method
786          * won't allow it. Instead, we decode it from a structure.
787          */
788         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
789         assertEquals(1, oper.detmPriority(outcome));
790     }
791
792     /**
793      * Tests doTask(Future) when the controller is not running.
794      */
795     @Test
796     public void testDoTaskFutureNotRunning() throws Exception {
797         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
798
799         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
800         controller.complete(params.makeOutcome());
801
802         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
803         assertFalse(future.isDone());
804         assertTrue(executor.runAll());
805
806         // should not have run the task
807         assertFalse(future.isDone());
808
809         // should have canceled the task future
810         assertTrue(taskFuture.isCancelled());
811     }
812
813     /**
814      * Tests doTask(Future) when the previous outcome was successful.
815      */
816     @Test
817     public void testDoTaskFutureSuccess() throws Exception {
818         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
819         final OperationOutcome taskOutcome = params.makeOutcome();
820
821         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
822
823         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
824
825         taskFuture.complete(taskOutcome);
826         assertTrue(executor.runAll());
827
828         assertTrue(future.isDone());
829         assertSame(taskOutcome, future.get());
830
831         // controller should not be done yet
832         assertFalse(controller.isDone());
833     }
834
835     /**
836      * Tests doTask(Future) when the previous outcome was failed.
837      */
838     @Test
839     public void testDoTaskFutureFailure() throws Exception {
840         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
841         final OperationOutcome failedOutcome = params.makeOutcome();
842         failedOutcome.setResult(PolicyResult.FAILURE);
843
844         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
845
846         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
847         assertFalse(future.isDone());
848         assertTrue(executor.runAll());
849
850         // should not have run the task
851         assertFalse(future.isDone());
852
853         // should have canceled the task future
854         assertTrue(taskFuture.isCancelled());
855
856         // controller SHOULD be done now
857         assertTrue(controller.isDone());
858         assertSame(failedOutcome, controller.get());
859     }
860
861     /**
862      * Tests doTask(Future) when the previous outcome was failed, but not checking
863      * success.
864      */
865     @Test
866     public void testDoTaskFutureUncheckedFailure() throws Exception {
867         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
868         final OperationOutcome failedOutcome = params.makeOutcome();
869         failedOutcome.setResult(PolicyResult.FAILURE);
870
871         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
872
873         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
874         assertFalse(future.isDone());
875
876         // complete the task
877         OperationOutcome taskOutcome = params.makeOutcome();
878         taskFuture.complete(taskOutcome);
879
880         assertTrue(executor.runAll());
881
882         // should have run the task
883         assertTrue(future.isDone());
884
885         assertTrue(future.isDone());
886         assertSame(taskOutcome, future.get());
887
888         // controller should not be done yet
889         assertFalse(controller.isDone());
890     }
891
892     /**
893      * Tests doTask(Function) when the controller is not running.
894      */
895     @Test
896     public void testDoTaskFunctionNotRunning() throws Exception {
897         AtomicBoolean invoked = new AtomicBoolean();
898
899         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
900             invoked.set(true);
901             return CompletableFuture.completedFuture(params.makeOutcome());
902         };
903
904         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
905         controller.complete(params.makeOutcome());
906
907         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
908         assertFalse(future.isDone());
909         assertTrue(executor.runAll());
910
911         // should not have run the task
912         assertFalse(future.isDone());
913
914         // should not have even invoked the task
915         assertFalse(invoked.get());
916     }
917
918     /**
919      * Tests doTask(Function) when the previous outcome was successful.
920      */
921     @Test
922     public void testDoTaskFunctionSuccess() throws Exception {
923         final OperationOutcome taskOutcome = params.makeOutcome();
924
925         final OperationOutcome failedOutcome = params.makeOutcome();
926
927         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
928
929         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
930
931         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
932
933         assertTrue(future.isDone());
934         assertSame(taskOutcome, future.get());
935
936         // controller should not be done yet
937         assertFalse(controller.isDone());
938     }
939
940     /**
941      * Tests doTask(Function) when the previous outcome was failed.
942      */
943     @Test
944     public void testDoTaskFunctionFailure() throws Exception {
945         final OperationOutcome failedOutcome = params.makeOutcome();
946         failedOutcome.setResult(PolicyResult.FAILURE);
947
948         AtomicBoolean invoked = new AtomicBoolean();
949
950         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
951             invoked.set(true);
952             return CompletableFuture.completedFuture(params.makeOutcome());
953         };
954
955         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
956
957         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
958         assertFalse(future.isDone());
959         assertTrue(executor.runAll());
960
961         // should not have run the task
962         assertFalse(future.isDone());
963
964         // should not have even invoked the task
965         assertFalse(invoked.get());
966
967         // controller should have the failed task
968         assertTrue(controller.isDone());
969         assertSame(failedOutcome, controller.get());
970     }
971
972     /**
973      * Tests doTask(Function) when the previous outcome was failed, but not checking
974      * success.
975      */
976     @Test
977     public void testDoTaskFunctionUncheckedFailure() throws Exception {
978         final OperationOutcome taskOutcome = params.makeOutcome();
979
980         final OperationOutcome failedOutcome = params.makeOutcome();
981         failedOutcome.setResult(PolicyResult.FAILURE);
982
983         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
984
985         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
986
987         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
988
989         assertTrue(future.isDone());
990         assertSame(taskOutcome, future.get());
991
992         // controller should not be done yet
993         assertFalse(controller.isDone());
994     }
995
996     /**
997      * Tests callbackStarted() when the pipeline has already been stopped.
998      */
999     @Test
1000     public void testCallbackStartedNotRunning() {
1001         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1002
1003         /*
1004          * arrange to stop the controller when the start-callback is invoked, but capture
1005          * the outcome
1006          */
1007         params = params.toBuilder().startCallback(oper -> {
1008             starter(oper);
1009             future.get().cancel(false);
1010         }).build();
1011
1012         // new params, thus need a new operation
1013         oper = new MyOper();
1014
1015         future.set(oper.start());
1016         assertTrue(executor.runAll());
1017
1018         // should have only run once
1019         assertEquals(1, numStart);
1020     }
1021
1022     /**
1023      * Tests callbackCompleted() when the pipeline has already been stopped.
1024      */
1025     @Test
1026     public void testCallbackCompletedNotRunning() {
1027         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1028
1029         // arrange to stop the controller when the start-callback is invoked
1030         params = params.toBuilder().startCallback(oper -> {
1031             future.get().cancel(false);
1032         }).build();
1033
1034         // new params, thus need a new operation
1035         oper = new MyOper();
1036
1037         future.set(oper.start());
1038         assertTrue(executor.runAll());
1039
1040         // should not have been set
1041         assertNull(opend);
1042         assertEquals(0, numEnd);
1043     }
1044
1045     @Test
1046     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1047         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1048
1049         OperationOutcome outcome;
1050
1051         outcome = new OperationOutcome();
1052         oper.setOutcome(outcome, timex);
1053         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1054         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1055
1056         outcome = new OperationOutcome();
1057         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1058         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1059         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1060     }
1061
1062     @Test
1063     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1064         OperationOutcome outcome;
1065
1066         outcome = new OperationOutcome();
1067         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1068         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1069         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1070
1071         for (PolicyResult result : FAILURE_RESULTS) {
1072             outcome = new OperationOutcome();
1073             oper.setOutcome(outcome, result);
1074             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1075             assertEquals(result.toString(), result, outcome.getResult());
1076         }
1077     }
1078
1079     @Test
1080     public void testIsTimeout() {
1081         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1082
1083         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1084         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1085         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1086         assertFalse(oper.isTimeout(new CompletionException(null)));
1087         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1088
1089         assertTrue(oper.isTimeout(timex));
1090         assertTrue(oper.isTimeout(new CompletionException(timex)));
1091     }
1092
1093     @Test
1094     public void testGetRetry() {
1095         assertEquals(0, oper.getRetry(null));
1096         assertEquals(10, oper.getRetry(10));
1097     }
1098
1099     @Test
1100     public void testGetRetryWait() {
1101         // need an operator that doesn't override the retry time
1102         OperationPartial oper2 = new OperationPartial(params, operator) {};
1103         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1104     }
1105
1106     @Test
1107     public void testGetTimeOutMs() {
1108         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1109
1110         params = params.toBuilder().timeoutSec(null).build();
1111
1112         // new params, thus need a new operation
1113         oper = new MyOper();
1114
1115         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1116     }
1117
1118     private void starter(OperationOutcome oper) {
1119         ++numStart;
1120         tstart = oper.getStart();
1121         opstart = oper;
1122     }
1123
1124     private void completer(OperationOutcome oper) {
1125         ++numEnd;
1126         opend = oper;
1127     }
1128
1129     /**
1130      * Gets a function that does nothing.
1131      *
1132      * @param <T> type of input parameter expected by the function
1133      * @return a function that does nothing
1134      */
1135     private <T> Consumer<T> noop() {
1136         return unused -> {
1137         };
1138     }
1139
1140     private OperationOutcome makeSuccess() {
1141         OperationOutcome outcome = params.makeOutcome();
1142         outcome.setResult(PolicyResult.SUCCESS);
1143
1144         return outcome;
1145     }
1146
1147     private OperationOutcome makeFailure() {
1148         OperationOutcome outcome = params.makeOutcome();
1149         outcome.setResult(PolicyResult.FAILURE);
1150
1151         return outcome;
1152     }
1153
1154     /**
1155      * Verifies a run.
1156      *
1157      * @param testName test name
1158      * @param expectedCallbacks number of callbacks expected
1159      * @param expectedOperations number of operation invocations expected
1160      * @param expectedResult expected outcome
1161      */
1162     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1163                     PolicyResult expectedResult) {
1164
1165         String expectedSubRequestId =
1166                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1167
1168         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1169     }
1170
1171     /**
1172      * Verifies a run.
1173      *
1174      * @param testName test name
1175      * @param expectedCallbacks number of callbacks expected
1176      * @param expectedOperations number of operation invocations expected
1177      * @param expectedResult expected outcome
1178      * @param expectedSubRequestId expected sub request ID
1179      * @param manipulator function to modify the future returned by
1180      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1181      *        in the executor are run
1182      */
1183     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1184                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1185
1186         CompletableFuture<OperationOutcome> future = oper.start();
1187
1188         manipulator.accept(future);
1189
1190         assertTrue(testName, executor.runAll());
1191
1192         assertEquals(testName, expectedCallbacks, numStart);
1193         assertEquals(testName, expectedCallbacks, numEnd);
1194
1195         if (expectedCallbacks > 0) {
1196             assertNotNull(testName, opstart);
1197             assertNotNull(testName, opend);
1198             assertEquals(testName, expectedResult, opend.getResult());
1199
1200             assertSame(testName, tstart, opstart.getStart());
1201             assertSame(testName, tstart, opend.getStart());
1202
1203             try {
1204                 assertTrue(future.isDone());
1205                 assertSame(testName, opend, future.get());
1206
1207             } catch (InterruptedException | ExecutionException e) {
1208                 throw new IllegalStateException(e);
1209             }
1210
1211             if (expectedOperations > 0) {
1212                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1213             }
1214         }
1215
1216         assertEquals(testName, expectedOperations, oper.getCount());
1217     }
1218
1219     private class MyOper extends OperationPartial {
1220         @Getter
1221         private int count = 0;
1222
1223         @Setter
1224         private boolean genException;
1225
1226         @Setter
1227         private int maxFailures = 0;
1228
1229         @Setter
1230         private CompletableFuture<OperationOutcome> guard;
1231
1232
1233         public MyOper() {
1234             super(OperationPartialTest.this.params, operator);
1235         }
1236
1237         @Override
1238         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1239             ++count;
1240             if (genException) {
1241                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1242             }
1243
1244             operation.setSubRequestId(String.valueOf(attempt));
1245
1246             if (count > maxFailures) {
1247                 operation.setResult(PolicyResult.SUCCESS);
1248             } else {
1249                 operation.setResult(PolicyResult.FAILURE);
1250             }
1251
1252             return operation;
1253         }
1254
1255         @Override
1256         protected CompletableFuture<OperationOutcome> startGuardAsync() {
1257             return (guard != null ? guard : super.startGuardAsync());
1258         }
1259
1260         @Override
1261         protected long getRetryWaitMs() {
1262             /*
1263              * Sleep timers run in the background, but we want to control things via the
1264              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1265              */
1266             return 0L;
1267         }
1268     }
1269 }