Merge "Implement validation and hierarchical get"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.Queue;
40 import java.util.UUID;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.CompletionException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.ForkJoinPool;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Function;
54 import java.util.stream.Collectors;
55 import lombok.Getter;
56 import lombok.Setter;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.onap.policy.common.utils.coder.CoderException;
60 import org.onap.policy.common.utils.coder.StandardCoder;
61 import org.onap.policy.controlloop.ControlLoopOperation;
62 import org.onap.policy.controlloop.VirtualControlLoopEvent;
63 import org.onap.policy.controlloop.actorserviceprovider.Operation;
64 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
65 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
66 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
67 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
68 import org.onap.policy.controlloop.policy.PolicyResult;
69
70 public class OperationPartialTest {
71     private static final int MAX_PARALLEL_REQUESTS = 10;
72     private static final String EXPECTED_EXCEPTION = "expected exception";
73     private static final String ACTOR = "my-actor";
74     private static final String OPERATION = "my-operation";
75     private static final String TARGET = "my-target";
76     private static final int TIMEOUT = 1000;
77     private static final UUID REQ_ID = UUID.randomUUID();
78
79     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
80                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
81
82     private VirtualControlLoopEvent event;
83     private ControlLoopEventContext context;
84     private MyExec executor;
85     private ControlLoopOperationParams params;
86
87     private MyOper oper;
88
89     private int numStart;
90     private int numEnd;
91
92     private Instant tstart;
93
94     private OperationOutcome opstart;
95     private OperationOutcome opend;
96
97     private OperatorPartial operator;
98
99     /**
100      * Initializes the fields, including {@link #oper}.
101      */
102     @Before
103     public void setUp() {
104         event = new VirtualControlLoopEvent();
105         event.setRequestId(REQ_ID);
106
107         context = new ControlLoopEventContext(event);
108         executor = new MyExec();
109
110         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
111                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
112                         .startCallback(this::starter).targetEntity(TARGET).build();
113
114         operator = new OperatorPartial(ACTOR, OPERATION) {
115             @Override
116             public Executor getBlockingExecutor() {
117                 return executor;
118             }
119
120             @Override
121             public Operation buildOperation(ControlLoopOperationParams params) {
122                 return null;
123             }
124         };
125
126         operator.configure(null);
127         operator.start();
128
129         oper = new MyOper();
130
131         tstart = null;
132
133         opstart = null;
134         opend = null;
135     }
136
137     @Test
138     public void testOperatorPartial_testGetActorName_testGetName() {
139         assertEquals(ACTOR, oper.getActorName());
140         assertEquals(OPERATION, oper.getName());
141         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
142     }
143
144     @Test
145     public void testGetBlockingThread() throws Exception {
146         CompletableFuture<Void> future = new CompletableFuture<>();
147
148         // use the real executor
149         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
150             @Override
151             public Operation buildOperation(ControlLoopOperationParams params) {
152                 return null;
153             }
154         };
155
156         oper2.getBlockingExecutor().execute(() -> future.complete(null));
157
158         assertNull(future.get(5, TimeUnit.SECONDS));
159     }
160
161     /**
162      * Exercises the doXxx() methods.
163      */
164     @Test
165     public void testDoXxx() {
166         assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
167         assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
168         assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
169         assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
170
171     }
172
173     @Test
174     public void testStart() {
175         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
176     }
177
178     /**
179      * Tests startOperation() when the operator is not running.
180      */
181     @Test
182     public void testStartNotRunning() {
183         // stop the operator
184         operator.stop();
185
186         assertThatIllegalStateException().isThrownBy(() -> oper.start());
187     }
188
189     /**
190      * Tests startOperation() when the operation has a preprocessor.
191      */
192     @Test
193     public void testStartWithPreprocessor() {
194         AtomicInteger count = new AtomicInteger();
195
196         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
197             count.incrementAndGet();
198             return makeSuccess();
199         }, executor);
200
201         oper.setGuard(preproc);
202
203         verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
204
205         assertEquals(1, count.get());
206     }
207
208     /**
209      * Tests start() with multiple running requests.
210      */
211     @Test
212     public void testStartMultiple() {
213         for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
214             oper.start();
215         }
216
217         assertTrue(executor.runAll());
218
219         assertNotNull(opstart);
220         assertNotNull(opend);
221         assertEquals(PolicyResult.SUCCESS, opend.getResult());
222
223         assertEquals(MAX_PARALLEL_REQUESTS, numStart);
224         assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
225         assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
226     }
227
228     /**
229      * Tests startPreprocessor() when the preprocessor returns a failure.
230      */
231     @Test
232     public void testStartPreprocessorFailure() {
233         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
234
235         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
236     }
237
238     /**
239      * Tests startPreprocessor() when the preprocessor throws an exception.
240      */
241     @Test
242     public void testStartPreprocessorException() {
243         // arrange for the preprocessor to throw an exception
244         oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
245
246         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
247     }
248
249     /**
250      * Tests startPreprocessor() when the pipeline is not running.
251      */
252     @Test
253     public void testStartPreprocessorNotRunning() {
254         // arrange for the preprocessor to return success, which will be ignored
255         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
256
257         oper.start().cancel(false);
258         assertTrue(executor.runAll());
259
260         assertNull(opstart);
261         assertNull(opend);
262
263         assertEquals(0, numStart);
264         assertEquals(0, oper.getCount());
265         assertEquals(0, numEnd);
266     }
267
268     /**
269      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
270      */
271     @Test
272     public void testStartPreprocessorBuilderException() {
273         oper = new MyOper() {
274             @Override
275             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
276                 throw new IllegalStateException(EXPECTED_EXCEPTION);
277             }
278         };
279
280         assertThatIllegalStateException().isThrownBy(() -> oper.start());
281
282         // should be nothing in the queue
283         assertEquals(0, executor.getQueueLength());
284     }
285
286     @Test
287     public void testStartPreprocessorAsync() {
288         assertNull(oper.startPreprocessorAsync());
289     }
290
291     @Test
292     public void testStartGuardAsync() {
293         assertNull(oper.startGuardAsync());
294     }
295
296     @Test
297     public void testStartOperationAsync() {
298         oper.start();
299         assertTrue(executor.runAll());
300
301         assertEquals(1, oper.getCount());
302     }
303
304     @Test
305     public void testIsSuccess() {
306         OperationOutcome outcome = new OperationOutcome();
307
308         outcome.setResult(PolicyResult.SUCCESS);
309         assertTrue(oper.isSuccess(outcome));
310
311         for (PolicyResult failure : FAILURE_RESULTS) {
312             outcome.setResult(failure);
313             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
314         }
315     }
316
317     @Test
318     public void testIsActorFailed() {
319         assertFalse(oper.isActorFailed(null));
320
321         OperationOutcome outcome = params.makeOutcome();
322
323         // incorrect outcome
324         outcome.setResult(PolicyResult.SUCCESS);
325         assertFalse(oper.isActorFailed(outcome));
326
327         outcome.setResult(PolicyResult.FAILURE_RETRIES);
328         assertFalse(oper.isActorFailed(outcome));
329
330         // correct outcome
331         outcome.setResult(PolicyResult.FAILURE);
332
333         // incorrect actor
334         outcome.setActor(TARGET);
335         assertFalse(oper.isActorFailed(outcome));
336         outcome.setActor(null);
337         assertFalse(oper.isActorFailed(outcome));
338         outcome.setActor(ACTOR);
339
340         // incorrect operation
341         outcome.setOperation(TARGET);
342         assertFalse(oper.isActorFailed(outcome));
343         outcome.setOperation(null);
344         assertFalse(oper.isActorFailed(outcome));
345         outcome.setOperation(OPERATION);
346
347         // correct values
348         assertTrue(oper.isActorFailed(outcome));
349     }
350
351     @Test
352     public void testDoOperation() {
353         /*
354          * Use an operation that doesn't override doOperation().
355          */
356         OperationPartial oper2 = new OperationPartial(params, operator) {};
357
358         oper2.start();
359         assertTrue(executor.runAll());
360
361         assertNotNull(opend);
362         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
363     }
364
365     @Test
366     public void testTimeout() throws Exception {
367
368         // use a real executor
369         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
370
371         // trigger timeout very quickly
372         oper = new MyOper() {
373             @Override
374             protected long getTimeoutMs(Integer timeoutSec) {
375                 return 1;
376             }
377
378             @Override
379             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
380
381                 OperationOutcome outcome2 = params.makeOutcome();
382                 outcome2.setResult(PolicyResult.SUCCESS);
383
384                 /*
385                  * Create an incomplete future that will timeout after the operation's
386                  * timeout. If it fires before the other timer, then it will return a
387                  * SUCCESS outcome.
388                  */
389                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
390                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
391                                 params.getExecutor());
392
393                 return future;
394             }
395         };
396
397         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
398     }
399
400     /**
401      * Tests retry functions, when the count is set to zero and retries are exhausted.
402      */
403     @Test
404     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
405         params = params.toBuilder().retry(0).build();
406
407         // new params, thus need a new operation
408         oper = new MyOper();
409
410         oper.setMaxFailures(10);
411
412         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
413     }
414
415     /**
416      * Tests retry functions, when the count is null and retries are exhausted.
417      */
418     @Test
419     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
420         params = params.toBuilder().retry(null).build();
421
422         // new params, thus need a new operation
423         oper = new MyOper();
424
425         oper.setMaxFailures(10);
426
427         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
428     }
429
430     /**
431      * Tests retry functions, when retries are exhausted.
432      */
433     @Test
434     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
435         final int maxRetries = 3;
436         params = params.toBuilder().retry(maxRetries).build();
437
438         // new params, thus need a new operation
439         oper = new MyOper();
440
441         oper.setMaxFailures(10);
442
443         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
444                         PolicyResult.FAILURE_RETRIES);
445     }
446
447     /**
448      * Tests retry functions, when a success follows some retries.
449      */
450     @Test
451     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
452         params = params.toBuilder().retry(10).build();
453
454         // new params, thus need a new operation
455         oper = new MyOper();
456
457         final int maxFailures = 3;
458         oper.setMaxFailures(maxFailures);
459
460         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
461                         PolicyResult.SUCCESS);
462     }
463
464     /**
465      * Tests retry functions, when the outcome is {@code null}.
466      */
467     @Test
468     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
469
470         // arrange to return null from doOperation()
471         oper = new MyOper() {
472             @Override
473             protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
474
475                 // update counters
476                 super.doOperation(attempt, operation);
477                 return null;
478             }
479         };
480
481         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
482     }
483
484     @Test
485     public void testSleep() throws Exception {
486         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
487         assertTrue(future.isDone());
488         assertNull(future.get());
489
490         // edge case
491         future = oper.sleep(0, TimeUnit.SECONDS);
492         assertTrue(future.isDone());
493         assertNull(future.get());
494
495         /*
496          * Start a second sleep we can use to check the first while it's running.
497          */
498         tstart = Instant.now();
499         future = oper.sleep(100, TimeUnit.MILLISECONDS);
500
501         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
502
503         // wait for second to complete and verify that the first has not completed
504         future2.get();
505         assertFalse(future.isDone());
506
507         // wait for second to complete
508         future.get();
509
510         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
511         assertTrue(diff >= 99);
512     }
513
514     @Test
515     public void testIsSameOperation() {
516         assertFalse(oper.isSameOperation(null));
517
518         OperationOutcome outcome = params.makeOutcome();
519
520         // wrong actor - should be false
521         outcome.setActor(null);
522         assertFalse(oper.isSameOperation(outcome));
523         outcome.setActor(TARGET);
524         assertFalse(oper.isSameOperation(outcome));
525         outcome.setActor(ACTOR);
526
527         // wrong operation - should be null
528         outcome.setOperation(null);
529         assertFalse(oper.isSameOperation(outcome));
530         outcome.setOperation(TARGET);
531         assertFalse(oper.isSameOperation(outcome));
532         outcome.setOperation(OPERATION);
533
534         assertTrue(oper.isSameOperation(outcome));
535     }
536
537     /**
538      * Tests handleFailure() when the outcome is a success.
539      */
540     @Test
541     public void testHandlePreprocessorFailureTrue() {
542         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
543         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
544     }
545
546     /**
547      * Tests handleFailure() when the outcome is <i>not</i> a success.
548      */
549     @Test
550     public void testHandlePreprocessorFailureFalse() throws Exception {
551         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
552         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
553     }
554
555     /**
556      * Tests handleFailure() when the outcome is {@code null}.
557      */
558     @Test
559     public void testHandlePreprocessorFailureNull() throws Exception {
560         // arrange to return null from the preprocessor
561         oper.setGuard(CompletableFuture.completedFuture(null));
562
563         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
564     }
565
566     @Test
567     public void testFromException() {
568         // arrange to generate an exception when operation runs
569         oper.setGenException(true);
570
571         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
572     }
573
574     /**
575      * Tests fromException() when there is no exception.
576      */
577     @Test
578     public void testFromExceptionNoExcept() {
579         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
580     }
581
582     /**
583      * Tests both flavors of anyOf(), because one invokes the other.
584      */
585     @Test
586     public void testAnyOf() throws Exception {
587         // first task completes, others do not
588         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
589
590         final OperationOutcome outcome = params.makeOutcome();
591
592         tasks.add(CompletableFuture.completedFuture(outcome));
593         tasks.add(new CompletableFuture<>());
594         tasks.add(new CompletableFuture<>());
595
596         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
597         assertTrue(executor.runAll());
598
599         assertTrue(result.isDone());
600         assertSame(outcome, result.get());
601
602         // second task completes, others do not
603         tasks = new LinkedList<>();
604
605         tasks.add(new CompletableFuture<>());
606         tasks.add(CompletableFuture.completedFuture(outcome));
607         tasks.add(new CompletableFuture<>());
608
609         result = oper.anyOf(tasks);
610         assertTrue(executor.runAll());
611
612         assertTrue(result.isDone());
613         assertSame(outcome, result.get());
614
615         // third task completes, others do not
616         tasks = new LinkedList<>();
617
618         tasks.add(new CompletableFuture<>());
619         tasks.add(new CompletableFuture<>());
620         tasks.add(CompletableFuture.completedFuture(outcome));
621
622         result = oper.anyOf(tasks);
623         assertTrue(executor.runAll());
624
625         assertTrue(result.isDone());
626         assertSame(outcome, result.get());
627     }
628
629     /**
630      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
631      */
632     @Test
633     @SuppressWarnings("unchecked")
634     public void testAnyOfEdge() throws Exception {
635         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
636
637         // zero items: check both using a list and using an array
638         assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
639         assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
640
641         // one item: : check both using a list and using an array
642         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
643         tasks.add(future1);
644
645         assertSame(future1, oper.anyOf(tasks));
646         assertSame(future1, oper.anyOf(future1));
647     }
648
649     /**
650      * Tests both flavors of allOf(), because one invokes the other.
651      */
652     @Test
653     public void testAllOf() throws Exception {
654         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
655
656         final OperationOutcome outcome = params.makeOutcome();
657
658         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
659         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
660         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
661
662         tasks.add(future1);
663         tasks.add(future2);
664         tasks.add(future3);
665
666         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
667
668         assertTrue(executor.runAll());
669         assertFalse(result.isDone());
670         future1.complete(outcome);
671
672         // complete 3 before 2
673         assertTrue(executor.runAll());
674         assertFalse(result.isDone());
675         future3.complete(outcome);
676
677         assertTrue(executor.runAll());
678         assertFalse(result.isDone());
679         future2.complete(outcome);
680
681         // all of them are now done
682         assertTrue(executor.runAll());
683         assertTrue(result.isDone());
684         assertSame(outcome, result.get());
685     }
686
687     /**
688      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
689      */
690     @Test
691     @SuppressWarnings("unchecked")
692     public void testAllOfEdge() throws Exception {
693         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
694
695         // zero items: check both using a list and using an array
696         assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
697         assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
698
699         // one item: : check both using a list and using an array
700         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
701         tasks.add(future1);
702
703         assertSame(future1, oper.allOf(tasks));
704         assertSame(future1, oper.allOf(future1));
705     }
706
707     @Test
708     public void testCombineOutcomes() throws Exception {
709         // only one outcome
710         verifyOutcomes(0, PolicyResult.SUCCESS);
711         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
712
713         // maximum is in different positions
714         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
715         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
716         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
717
718         // null outcome
719         final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
720         tasks.add(CompletableFuture.completedFuture(null));
721         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
722
723         assertTrue(executor.runAll());
724         assertTrue(result.isDone());
725         assertNull(result.get());
726
727         // one throws an exception during execution
728         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
729
730         tasks.clear();
731         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
732         tasks.add(CompletableFuture.failedFuture(except));
733         tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
734         result = oper.allOf(tasks);
735
736         assertTrue(executor.runAll());
737         assertTrue(result.isCompletedExceptionally());
738         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
739     }
740
741     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
742         List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
743
744
745         OperationOutcome expectedOutcome = null;
746
747         for (int count = 0; count < results.length; ++count) {
748             OperationOutcome outcome = params.makeOutcome();
749             outcome.setResult(results[count]);
750             tasks.add(CompletableFuture.completedFuture(outcome));
751
752             if (count == expected) {
753                 expectedOutcome = outcome;
754             }
755         }
756
757         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
758
759         assertTrue(executor.runAll());
760         assertTrue(result.isDone());
761         assertSame(expectedOutcome, result.get());
762     }
763
764     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
765                     final OperationOutcome taskOutcome) {
766
767         return outcome -> CompletableFuture.completedFuture(taskOutcome);
768     }
769
770     @Test
771     public void testDetmPriority() throws CoderException {
772         assertEquals(1, oper.detmPriority(null));
773
774         OperationOutcome outcome = params.makeOutcome();
775
776         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
777                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
778                         PolicyResult.FAILURE_EXCEPTION, 6);
779
780         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
781             outcome.setResult(ent.getKey());
782             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
783         }
784
785         /*
786          * Test null result. We can't actually set it to null, because the set() method
787          * won't allow it. Instead, we decode it from a structure.
788          */
789         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
790         assertEquals(1, oper.detmPriority(outcome));
791     }
792
793     /**
794      * Tests doTask(Future) when the controller is not running.
795      */
796     @Test
797     public void testDoTaskFutureNotRunning() throws Exception {
798         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
799
800         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
801         controller.complete(params.makeOutcome());
802
803         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
804         assertFalse(future.isDone());
805         assertTrue(executor.runAll());
806
807         // should not have run the task
808         assertFalse(future.isDone());
809
810         // should have canceled the task future
811         assertTrue(taskFuture.isCancelled());
812     }
813
814     /**
815      * Tests doTask(Future) when the previous outcome was successful.
816      */
817     @Test
818     public void testDoTaskFutureSuccess() throws Exception {
819         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
820         final OperationOutcome taskOutcome = params.makeOutcome();
821
822         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
823
824         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
825
826         taskFuture.complete(taskOutcome);
827         assertTrue(executor.runAll());
828
829         assertTrue(future.isDone());
830         assertSame(taskOutcome, future.get());
831
832         // controller should not be done yet
833         assertFalse(controller.isDone());
834     }
835
836     /**
837      * Tests doTask(Future) when the previous outcome was failed.
838      */
839     @Test
840     public void testDoTaskFutureFailure() throws Exception {
841         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
842         final OperationOutcome failedOutcome = params.makeOutcome();
843         failedOutcome.setResult(PolicyResult.FAILURE);
844
845         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
846
847         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
848         assertFalse(future.isDone());
849         assertTrue(executor.runAll());
850
851         // should not have run the task
852         assertFalse(future.isDone());
853
854         // should have canceled the task future
855         assertTrue(taskFuture.isCancelled());
856
857         // controller SHOULD be done now
858         assertTrue(controller.isDone());
859         assertSame(failedOutcome, controller.get());
860     }
861
862     /**
863      * Tests doTask(Future) when the previous outcome was failed, but not checking
864      * success.
865      */
866     @Test
867     public void testDoTaskFutureUncheckedFailure() throws Exception {
868         CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
869         final OperationOutcome failedOutcome = params.makeOutcome();
870         failedOutcome.setResult(PolicyResult.FAILURE);
871
872         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
873
874         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
875         assertFalse(future.isDone());
876
877         // complete the task
878         OperationOutcome taskOutcome = params.makeOutcome();
879         taskFuture.complete(taskOutcome);
880
881         assertTrue(executor.runAll());
882
883         // should have run the task
884         assertTrue(future.isDone());
885
886         assertTrue(future.isDone());
887         assertSame(taskOutcome, future.get());
888
889         // controller should not be done yet
890         assertFalse(controller.isDone());
891     }
892
893     /**
894      * Tests doTask(Function) when the controller is not running.
895      */
896     @Test
897     public void testDoTaskFunctionNotRunning() throws Exception {
898         AtomicBoolean invoked = new AtomicBoolean();
899
900         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
901             invoked.set(true);
902             return CompletableFuture.completedFuture(params.makeOutcome());
903         };
904
905         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
906         controller.complete(params.makeOutcome());
907
908         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
909         assertFalse(future.isDone());
910         assertTrue(executor.runAll());
911
912         // should not have run the task
913         assertFalse(future.isDone());
914
915         // should not have even invoked the task
916         assertFalse(invoked.get());
917     }
918
919     /**
920      * Tests doTask(Function) when the previous outcome was successful.
921      */
922     @Test
923     public void testDoTaskFunctionSuccess() throws Exception {
924         final OperationOutcome taskOutcome = params.makeOutcome();
925
926         final OperationOutcome failedOutcome = params.makeOutcome();
927
928         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
929
930         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
931
932         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
933
934         assertTrue(future.isDone());
935         assertSame(taskOutcome, future.get());
936
937         // controller should not be done yet
938         assertFalse(controller.isDone());
939     }
940
941     /**
942      * Tests doTask(Function) when the previous outcome was failed.
943      */
944     @Test
945     public void testDoTaskFunctionFailure() throws Exception {
946         final OperationOutcome failedOutcome = params.makeOutcome();
947         failedOutcome.setResult(PolicyResult.FAILURE);
948
949         AtomicBoolean invoked = new AtomicBoolean();
950
951         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
952             invoked.set(true);
953             return CompletableFuture.completedFuture(params.makeOutcome());
954         };
955
956         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
957
958         CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
959         assertFalse(future.isDone());
960         assertTrue(executor.runAll());
961
962         // should not have run the task
963         assertFalse(future.isDone());
964
965         // should not have even invoked the task
966         assertFalse(invoked.get());
967
968         // controller should have the failed task
969         assertTrue(controller.isDone());
970         assertSame(failedOutcome, controller.get());
971     }
972
973     /**
974      * Tests doTask(Function) when the previous outcome was failed, but not checking
975      * success.
976      */
977     @Test
978     public void testDoTaskFunctionUncheckedFailure() throws Exception {
979         final OperationOutcome taskOutcome = params.makeOutcome();
980
981         final OperationOutcome failedOutcome = params.makeOutcome();
982         failedOutcome.setResult(PolicyResult.FAILURE);
983
984         Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
985
986         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
987
988         CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
989
990         assertTrue(future.isDone());
991         assertSame(taskOutcome, future.get());
992
993         // controller should not be done yet
994         assertFalse(controller.isDone());
995     }
996
997     /**
998      * Tests callbackStarted() when the pipeline has already been stopped.
999      */
1000     @Test
1001     public void testCallbackStartedNotRunning() {
1002         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1003
1004         /*
1005          * arrange to stop the controller when the start-callback is invoked, but capture
1006          * the outcome
1007          */
1008         params = params.toBuilder().startCallback(oper -> {
1009             starter(oper);
1010             future.get().cancel(false);
1011         }).build();
1012
1013         // new params, thus need a new operation
1014         oper = new MyOper();
1015
1016         future.set(oper.start());
1017         assertTrue(executor.runAll());
1018
1019         // should have only run once
1020         assertEquals(1, numStart);
1021     }
1022
1023     /**
1024      * Tests callbackCompleted() when the pipeline has already been stopped.
1025      */
1026     @Test
1027     public void testCallbackCompletedNotRunning() {
1028         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1029
1030         // arrange to stop the controller when the start-callback is invoked
1031         params = params.toBuilder().startCallback(oper -> {
1032             future.get().cancel(false);
1033         }).build();
1034
1035         // new params, thus need a new operation
1036         oper = new MyOper();
1037
1038         future.set(oper.start());
1039         assertTrue(executor.runAll());
1040
1041         // should not have been set
1042         assertNull(opend);
1043         assertEquals(0, numEnd);
1044     }
1045
1046     @Test
1047     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1048         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1049
1050         OperationOutcome outcome;
1051
1052         outcome = new OperationOutcome();
1053         oper.setOutcome(outcome, timex);
1054         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1055         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1056
1057         outcome = new OperationOutcome();
1058         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1059         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1060         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1061     }
1062
1063     @Test
1064     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1065         OperationOutcome outcome;
1066
1067         outcome = new OperationOutcome();
1068         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1069         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1070         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1071
1072         for (PolicyResult result : FAILURE_RESULTS) {
1073             outcome = new OperationOutcome();
1074             oper.setOutcome(outcome, result);
1075             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1076             assertEquals(result.toString(), result, outcome.getResult());
1077         }
1078     }
1079
1080     @Test
1081     public void testIsTimeout() {
1082         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1083
1084         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1085         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1086         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1087         assertFalse(oper.isTimeout(new CompletionException(null)));
1088         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1089
1090         assertTrue(oper.isTimeout(timex));
1091         assertTrue(oper.isTimeout(new CompletionException(timex)));
1092     }
1093
1094     @Test
1095     public void testGetRetry() {
1096         assertEquals(0, oper.getRetry(null));
1097         assertEquals(10, oper.getRetry(10));
1098     }
1099
1100     @Test
1101     public void testGetRetryWait() {
1102         // need an operator that doesn't override the retry time
1103         OperationPartial oper2 = new OperationPartial(params, operator) {};
1104         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1105     }
1106
1107     @Test
1108     public void testGetTimeOutMs() {
1109         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1110
1111         params = params.toBuilder().timeoutSec(null).build();
1112
1113         // new params, thus need a new operation
1114         oper = new MyOper();
1115
1116         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1117     }
1118
1119     private void starter(OperationOutcome oper) {
1120         ++numStart;
1121         tstart = oper.getStart();
1122         opstart = oper;
1123     }
1124
1125     private void completer(OperationOutcome oper) {
1126         ++numEnd;
1127         opend = oper;
1128     }
1129
1130     /**
1131      * Gets a function that does nothing.
1132      *
1133      * @param <T> type of input parameter expected by the function
1134      * @return a function that does nothing
1135      */
1136     private <T> Consumer<T> noop() {
1137         return unused -> {
1138         };
1139     }
1140
1141     private OperationOutcome makeSuccess() {
1142         OperationOutcome outcome = params.makeOutcome();
1143         outcome.setResult(PolicyResult.SUCCESS);
1144
1145         return outcome;
1146     }
1147
1148     private OperationOutcome makeFailure() {
1149         OperationOutcome outcome = params.makeOutcome();
1150         outcome.setResult(PolicyResult.FAILURE);
1151
1152         return outcome;
1153     }
1154
1155     /**
1156      * Verifies a run.
1157      *
1158      * @param testName test name
1159      * @param expectedCallbacks number of callbacks expected
1160      * @param expectedOperations number of operation invocations expected
1161      * @param expectedResult expected outcome
1162      */
1163     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1164                     PolicyResult expectedResult) {
1165
1166         String expectedSubRequestId =
1167                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1168
1169         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1170     }
1171
1172     /**
1173      * Verifies a run.
1174      *
1175      * @param testName test name
1176      * @param expectedCallbacks number of callbacks expected
1177      * @param expectedOperations number of operation invocations expected
1178      * @param expectedResult expected outcome
1179      * @param expectedSubRequestId expected sub request ID
1180      * @param manipulator function to modify the future returned by
1181      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1182      *        in the executor are run
1183      */
1184     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1185                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1186
1187         CompletableFuture<OperationOutcome> future = oper.start();
1188
1189         manipulator.accept(future);
1190
1191         assertTrue(testName, executor.runAll());
1192
1193         assertEquals(testName, expectedCallbacks, numStart);
1194         assertEquals(testName, expectedCallbacks, numEnd);
1195
1196         if (expectedCallbacks > 0) {
1197             assertNotNull(testName, opstart);
1198             assertNotNull(testName, opend);
1199             assertEquals(testName, expectedResult, opend.getResult());
1200
1201             assertSame(testName, tstart, opstart.getStart());
1202             assertSame(testName, tstart, opend.getStart());
1203
1204             try {
1205                 assertTrue(future.isDone());
1206                 assertSame(testName, opend, future.get());
1207
1208             } catch (InterruptedException | ExecutionException e) {
1209                 throw new IllegalStateException(e);
1210             }
1211
1212             if (expectedOperations > 0) {
1213                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1214             }
1215         }
1216
1217         assertEquals(testName, expectedOperations, oper.getCount());
1218     }
1219
1220     private class MyOper extends OperationPartial {
1221         @Getter
1222         private int count = 0;
1223
1224         @Setter
1225         private boolean genException;
1226
1227         @Setter
1228         private int maxFailures = 0;
1229
1230         @Setter
1231         private CompletableFuture<OperationOutcome> guard;
1232
1233
1234         public MyOper() {
1235             super(OperationPartialTest.this.params, operator);
1236         }
1237
1238         @Override
1239         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1240             ++count;
1241             if (genException) {
1242                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1243             }
1244
1245             operation.setSubRequestId(String.valueOf(attempt));
1246
1247             if (count > maxFailures) {
1248                 operation.setResult(PolicyResult.SUCCESS);
1249             } else {
1250                 operation.setResult(PolicyResult.FAILURE);
1251             }
1252
1253             return operation;
1254         }
1255
1256         @Override
1257         protected CompletableFuture<OperationOutcome> startGuardAsync() {
1258             return (guard != null ? guard : super.startGuardAsync());
1259         }
1260
1261         @Override
1262         protected long getRetryWaitMs() {
1263             /*
1264              * Sleep timers run in the background, but we want to control things via the
1265              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1266              */
1267             return 0L;
1268         }
1269     }
1270
1271     /**
1272      * Executor that will run tasks until the queue is empty or a maximum number of tasks
1273      * have been executed. Doesn't actually run anything until {@link #runAll()} is
1274      * invoked.
1275      */
1276     private static class MyExec implements Executor {
1277         private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
1278
1279         private Queue<Runnable> commands = new LinkedList<>();
1280
1281         public MyExec() {
1282             // do nothing
1283         }
1284
1285         public int getQueueLength() {
1286             return commands.size();
1287         }
1288
1289         @Override
1290         public void execute(Runnable command) {
1291             commands.add(command);
1292         }
1293
1294         public boolean runAll() {
1295             for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
1296                 commands.remove().run();
1297             }
1298
1299             return commands.isEmpty();
1300         }
1301     }
1302 }