Actor redesign.
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperatorPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33
34 import java.time.Instant;
35 import java.util.Arrays;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Queue;
40 import java.util.TreeMap;
41 import java.util.UUID;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.CompletionException;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Executor;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Function;
54 import java.util.stream.Collectors;
55 import lombok.Getter;
56 import lombok.Setter;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.onap.policy.controlloop.ControlLoopOperation;
60 import org.onap.policy.controlloop.VirtualControlLoopEvent;
61 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
62 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
63 import org.onap.policy.controlloop.policy.Policy;
64 import org.onap.policy.controlloop.policy.PolicyResult;
65
66 public class OperatorPartialTest {
67     private static final int MAX_PARALLEL_REQUESTS = 10;
68     private static final String EXPECTED_EXCEPTION = "expected exception";
69     private static final String ACTOR = "my-actor";
70     private static final String OPERATOR = "my-operator";
71     private static final String TARGET = "my-target";
72     private static final int TIMEOUT = 1000;
73     private static final UUID REQ_ID = UUID.randomUUID();
74
75     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
76                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
77
78     private static final List<String> FAILURE_STRINGS =
79                     FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList());
80
81     private VirtualControlLoopEvent event;
82     private Map<String, Object> config;
83     private ControlLoopEventContext context;
84     private MyExec executor;
85     private Policy policy;
86     private ControlLoopOperationParams params;
87
88     private MyOper oper;
89
90     private int numStart;
91     private int numEnd;
92
93     private Instant tstart;
94
95     private ControlLoopOperation opstart;
96     private ControlLoopOperation opend;
97
98     /**
99      * Initializes the fields, including {@link #oper}.
100      */
101     @Before
102     public void setUp() {
103         event = new VirtualControlLoopEvent();
104         event.setRequestId(REQ_ID);
105
106         config = new TreeMap<>();
107         context = new ControlLoopEventContext(event);
108         executor = new MyExec();
109
110         policy = new Policy();
111         policy.setActor(ACTOR);
112         policy.setRecipe(OPERATOR);
113         policy.setTimeout(TIMEOUT);
114
115         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
116                         .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build();
117
118         oper = new MyOper();
119         oper.configure(new TreeMap<>());
120         oper.start();
121
122         tstart = null;
123
124         opstart = null;
125         opend = null;
126     }
127
128     @Test
129     public void testOperatorPartial_testGetActorName_testGetName() {
130         assertEquals(ACTOR, oper.getActorName());
131         assertEquals(OPERATOR, oper.getName());
132         assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
133     }
134
135     @Test
136     public void testDoStart() {
137         oper = spy(new MyOper());
138
139         oper.configure(config);
140         oper.start();
141
142         verify(oper).doStart();
143
144         // others should not have been invoked
145         verify(oper, never()).doStop();
146         verify(oper, never()).doShutdown();
147     }
148
149     @Test
150     public void testDoStop() {
151         oper = spy(new MyOper());
152
153         oper.configure(config);
154         oper.start();
155         oper.stop();
156
157         verify(oper).doStop();
158
159         // should not have been re-invoked
160         verify(oper).doStart();
161
162         // others should not have been invoked
163         verify(oper, never()).doShutdown();
164     }
165
166     @Test
167     public void testDoShutdown() {
168         oper = spy(new MyOper());
169
170         oper.configure(config);
171         oper.start();
172         oper.shutdown();
173
174         verify(oper).doShutdown();
175
176         // should not have been re-invoked
177         verify(oper).doStart();
178
179         // others should not have been invoked
180         verify(oper, never()).doStop();
181     }
182
183     @Test
184     public void testStartOperation_testVerifyRunning() {
185         verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
186     }
187
188     /**
189      * Tests startOperation() when the operator is not running.
190      */
191     @Test
192     public void testStartOperationNotRunning() {
193         // use a new operator, one that hasn't been started yet
194         oper = new MyOper();
195         oper.configure(new TreeMap<>());
196
197         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
198     }
199
200     /**
201      * Tests startOperation() when the operation has a preprocessor.
202      */
203     @Test
204     public void testStartOperationWithPreprocessor_testStartPreprocessor() {
205         AtomicInteger count = new AtomicInteger();
206
207         // @formatter:off
208         Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
209             oper -> CompletableFuture.supplyAsync(() -> {
210                 count.incrementAndGet();
211                 oper.setOutcome(PolicyResult.SUCCESS.toString());
212                 return oper;
213             }, executor);
214         // @formatter:on
215
216         oper.setPreProcessor(preproc);
217
218         verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
219
220         assertEquals(1, count.get());
221     }
222
223     /**
224      * Tests startOperation() with multiple running requests.
225      */
226     @Test
227     public void testStartOperationMultiple() {
228         for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
229             oper.startOperation(params);
230         }
231
232         assertTrue(executor.runAll());
233
234         assertNotNull(opstart);
235         assertNotNull(opend);
236         assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
237
238         assertEquals(MAX_PARALLEL_REQUESTS, numStart);
239         assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
240         assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
241     }
242
243     /**
244      * Tests startPreprocessor() when the preprocessor returns a failure.
245      */
246     @Test
247     public void testStartPreprocessorFailure() {
248         // arrange for the preprocessor to return a failure
249         oper.setPreProcessor(oper -> {
250             oper.setOutcome(PolicyResult.FAILURE_GUARD.toString());
251             return CompletableFuture.completedFuture(oper);
252         });
253
254         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
255     }
256
257     /**
258      * Tests startPreprocessor() when the preprocessor throws an exception.
259      */
260     @Test
261     public void testStartPreprocessorException() {
262         // arrange for the preprocessor to throw an exception
263         oper.setPreProcessor(oper -> {
264             throw new IllegalStateException(EXPECTED_EXCEPTION);
265         });
266
267         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
268     }
269
270     /**
271      * Tests startPreprocessor() when the pipeline is not running.
272      */
273     @Test
274     public void testStartPreprocessorNotRunning() {
275         // arrange for the preprocessor to return success, which will be ignored
276         oper.setPreProcessor(oper -> {
277             oper.setOutcome(PolicyResult.SUCCESS.toString());
278             return CompletableFuture.completedFuture(oper);
279         });
280
281         oper.startOperation(params).cancel(false);
282         assertTrue(executor.runAll());
283
284         assertNull(opstart);
285         assertNull(opend);
286
287         assertEquals(0, numStart);
288         assertEquals(0, oper.getCount());
289         assertEquals(0, numEnd);
290     }
291
292     /**
293      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
294      */
295     @Test
296     public void testStartPreprocessorBuilderException() {
297         oper = new MyOper() {
298             @Override
299             protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
300                             ControlLoopOperationParams params) {
301                 throw new IllegalStateException(EXPECTED_EXCEPTION);
302             }
303         };
304
305         oper.configure(new TreeMap<>());
306         oper.start();
307
308         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
309
310         // should be nothing in the queue
311         assertEquals(0, executor.getQueueLength());
312     }
313
314     @Test
315     public void testDoPreprocessorAsFuture() {
316         assertNull(oper.doPreprocessorAsFuture(params));
317     }
318
319     @Test
320     public void testStartOperationOnly_testDoOperationAsFuture() {
321         oper.startOperation(params);
322         assertTrue(executor.runAll());
323
324         assertEquals(1, oper.getCount());
325     }
326
327     /**
328      * Tests startOperationOnce() when
329      * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an
330      * exception.
331      */
332     @Test
333     public void testStartOperationOnceBuilderException() {
334         oper = new MyOper() {
335             @Override
336             protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
337                             ControlLoopOperationParams params, int attempt) {
338                 throw new IllegalStateException(EXPECTED_EXCEPTION);
339             }
340         };
341
342         oper.configure(new TreeMap<>());
343         oper.start();
344
345         assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
346
347         // should be nothing in the queue
348         assertEquals(0, executor.getQueueLength());
349     }
350
351     @Test
352     public void testIsSuccess() {
353         ControlLoopOperation outcome = new ControlLoopOperation();
354
355         outcome.setOutcome(PolicyResult.SUCCESS.toString());
356         assertTrue(oper.isSuccess(outcome));
357
358         for (String failure : FAILURE_STRINGS) {
359             outcome.setOutcome(failure);
360             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
361         }
362     }
363
364     @Test
365     public void testIsActorFailed() {
366         assertFalse(oper.isActorFailed(null));
367
368         ControlLoopOperation outcome = params.makeOutcome();
369
370         // incorrect outcome
371         outcome.setOutcome(PolicyResult.SUCCESS.toString());
372         assertFalse(oper.isActorFailed(outcome));
373
374         outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString());
375         assertFalse(oper.isActorFailed(outcome));
376
377         // correct outcome
378         outcome.setOutcome(PolicyResult.FAILURE.toString());
379
380         // incorrect actor
381         outcome.setActor(TARGET);
382         assertFalse(oper.isActorFailed(outcome));
383         outcome.setActor(null);
384         assertFalse(oper.isActorFailed(outcome));
385         outcome.setActor(ACTOR);
386
387         // incorrect operation
388         outcome.setOperation(TARGET);
389         assertFalse(oper.isActorFailed(outcome));
390         outcome.setOperation(null);
391         assertFalse(oper.isActorFailed(outcome));
392         outcome.setOperation(OPERATOR);
393
394         // correct values
395         assertTrue(oper.isActorFailed(outcome));
396     }
397
398     @Test
399     public void testDoOperation() {
400         /*
401          * Use an operator that doesn't override doOperation().
402          */
403         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
404
405         oper2.configure(new TreeMap<>());
406         oper2.start();
407
408         oper2.startOperation(params);
409         assertTrue(executor.runAll());
410
411         assertNotNull(opend);
412         assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome());
413     }
414
415     @Test
416     public void testTimeout() throws Exception {
417
418         // use a real executor
419         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
420
421         // trigger timeout very quickly
422         oper = new MyOper() {
423             @Override
424             protected long getTimeOutMillis(Policy policy) {
425                 return 1;
426             }
427
428             @Override
429             protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
430                             ControlLoopOperationParams params, int attempt) {
431
432                 return outcome -> {
433                     ControlLoopOperation outcome2 = params.makeOutcome();
434                     outcome2.setOutcome(PolicyResult.SUCCESS.toString());
435
436                     /*
437                      * Create an incomplete future that will timeout after the operation's
438                      * timeout. If it fires before the other timer, then it will return a
439                      * SUCCESS outcome.
440                      */
441                     CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
442                     future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
443                                     params.getExecutor());
444
445                     return future;
446                 };
447             }
448         };
449
450         oper.configure(new TreeMap<>());
451         oper.start();
452
453         assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome());
454     }
455
456     /**
457      * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
458      * operation once the preprocessor completes.
459      */
460     @Test
461     public void testTimeoutInPreprocessor() throws Exception {
462
463         // use a real executor
464         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
465
466         // trigger timeout very quickly
467         oper = new MyOper() {
468             @Override
469             protected long getTimeOutMillis(Policy policy) {
470                 return 10;
471             }
472
473             @Override
474             protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
475                             ControlLoopOperationParams params) {
476
477                 return outcome -> {
478                     outcome.setOutcome(PolicyResult.SUCCESS.toString());
479
480                     /*
481                      * Create an incomplete future that will timeout after the operation's
482                      * timeout. If it fires before the other timer, then it will return a
483                      * SUCCESS outcome.
484                      */
485                     CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
486                     future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
487                                     params.getExecutor());
488
489                     return future;
490                 };
491             }
492         };
493
494         oper.configure(new TreeMap<>());
495         oper.start();
496
497         ControlLoopOperation result = oper.startOperation(params).get();
498         assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome());
499
500         assertNotNull(opstart);
501         assertNotNull(opend);
502         assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
503
504         assertEquals(1, numStart);
505         assertEquals(1, oper.getCount());
506         assertEquals(1, numEnd);
507     }
508
509     /**
510      * Tests retry functions, when the count is set to zero and retries are exhausted.
511      */
512     @Test
513     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() {
514         policy.setRetry(0);
515         oper.setMaxFailures(10);
516
517         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
518     }
519
520     /**
521      * Tests retry functions, when the count is null and retries are exhausted.
522      */
523     @Test
524     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
525         policy.setRetry(null);
526         oper.setMaxFailures(10);
527
528         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
529     }
530
531     /**
532      * Tests retry functions, when retries are exhausted.
533      */
534     @Test
535     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
536         final int maxRetries = 3;
537         policy.setRetry(maxRetries);
538         oper.setMaxFailures(10);
539
540         verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES);
541     }
542
543     /**
544      * Tests retry functions, when a success follows some retries.
545      */
546     @Test
547     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
548         policy.setRetry(10);
549
550         final int maxFailures = 3;
551         oper.setMaxFailures(maxFailures);
552
553         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
554                         PolicyResult.SUCCESS);
555     }
556
557     /**
558      * Tests retry functions, when the outcome is {@code null}.
559      */
560     @Test
561     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
562
563         // arrange to return null from doOperation()
564         oper = new MyOper() {
565             @Override
566             protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
567                             ControlLoopOperation operation) {
568
569                 // update counters
570                 super.doOperation(params, attempt, operation);
571                 return null;
572             }
573         };
574
575         oper.configure(new TreeMap<>());
576         oper.start();
577
578         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
579     }
580
581     @Test
582     public void testGetActorOutcome() {
583         assertNull(oper.getActorOutcome(null));
584
585         ControlLoopOperation outcome = params.makeOutcome();
586         outcome.setOutcome(TARGET);
587
588         // wrong actor - should be null
589         outcome.setActor(null);
590         assertNull(oper.getActorOutcome(outcome));
591         outcome.setActor(TARGET);
592         assertNull(oper.getActorOutcome(outcome));
593         outcome.setActor(ACTOR);
594
595         // wrong operation - should be null
596         outcome.setOperation(null);
597         assertNull(oper.getActorOutcome(outcome));
598         outcome.setOperation(TARGET);
599         assertNull(oper.getActorOutcome(outcome));
600         outcome.setOperation(OPERATOR);
601
602         assertEquals(TARGET, oper.getActorOutcome(outcome));
603     }
604
605     @Test
606     public void testOnSuccess() throws Exception {
607         AtomicInteger count = new AtomicInteger();
608
609         final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> {
610             count.incrementAndGet();
611             return CompletableFuture.completedFuture(oper);
612         };
613
614         // pass it a null outcome
615         ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get();
616         assertNotNull(outcome);
617         assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome());
618         assertEquals(0, count.get());
619
620         // pass it an unpopulated (i.e., failed) outcome
621         outcome = new ControlLoopOperation();
622         assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
623         assertEquals(0, count.get());
624
625         // pass it a successful outcome
626         outcome = params.makeOutcome();
627         outcome.setOutcome(PolicyResult.SUCCESS.toString());
628         assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
629         assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
630         assertEquals(1, count.get());
631     }
632
633     /**
634      * Tests onSuccess() and handleFailure() when the outcome is a success.
635      */
636     @Test
637     public void testOnSuccessTrue_testHandleFailureTrue() {
638         // arrange to return a success from the preprocessor
639         oper.setPreProcessor(oper -> {
640             oper.setOutcome(PolicyResult.SUCCESS.toString());
641             return CompletableFuture.completedFuture(oper);
642         });
643
644         verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS);
645     }
646
647     /**
648      * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success.
649      */
650     @Test
651     public void testOnSuccessFalse_testHandleFailureFalse() throws Exception {
652         // arrange to return a failure from the preprocessor
653         oper.setPreProcessor(oper -> {
654             oper.setOutcome(PolicyResult.FAILURE.toString());
655             return CompletableFuture.completedFuture(oper);
656         });
657
658         verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
659     }
660
661     /**
662      * Tests onSuccess() and handleFailure() when the outcome is {@code null}.
663      */
664     @Test
665     public void testOnSuccessFalse_testHandleFailureNull() throws Exception {
666         // arrange to return null from the preprocessor
667         oper.setPreProcessor(oper -> {
668             return CompletableFuture.completedFuture(null);
669         });
670
671         verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
672     }
673
674     @Test
675     public void testFromException() {
676         // arrange to generate an exception when operation runs
677         oper.setGenException(true);
678
679         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
680     }
681
682     /**
683      * Tests fromException() when there is no exception.
684      */
685     @Test
686     public void testFromExceptionNoExcept() {
687         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
688     }
689
690     /**
691      * Tests verifyRunning() when the pipeline is not running.
692      */
693     @Test
694     public void testVerifyRunningWhenNot() {
695         verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false));
696     }
697
698     /**
699      * Tests callbackStarted() when the pipeline has already been stopped.
700      */
701     @Test
702     public void testCallbackStartedNotRunning() {
703         AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
704
705         /*
706          * arrange to stop the controller when the start-callback is invoked, but capture
707          * the outcome
708          */
709         params = params.toBuilder().startCallback(oper -> {
710             starter(oper);
711             future.get().cancel(false);
712         }).build();
713
714         future.set(oper.startOperation(params));
715         assertTrue(executor.runAll());
716
717         // should have only run once
718         assertEquals(1, numStart);
719     }
720
721     /**
722      * Tests callbackCompleted() when the pipeline has already been stopped.
723      */
724     @Test
725     public void testCallbackCompletedNotRunning() {
726         AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
727
728         // arrange to stop the controller when the start-callback is invoked
729         params = params.toBuilder().startCallback(oper -> {
730             future.get().cancel(false);
731         }).build();
732
733         future.set(oper.startOperation(params));
734         assertTrue(executor.runAll());
735
736         // should not have been set
737         assertNull(opend);
738         assertEquals(0, numEnd);
739     }
740
741     @Test
742     public void testSetOutcomeControlLoopOperationThrowable() {
743         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
744
745         ControlLoopOperation outcome;
746
747         outcome = new ControlLoopOperation();
748         oper.setOutcome(params, outcome, timex);
749         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
750         assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome());
751
752         outcome = new ControlLoopOperation();
753         oper.setOutcome(params, outcome, new IllegalStateException());
754         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
755         assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome());
756     }
757
758     @Test
759     public void testSetOutcomeControlLoopOperationPolicyResult() {
760         ControlLoopOperation outcome;
761
762         outcome = new ControlLoopOperation();
763         oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
764         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
765         assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
766
767         for (PolicyResult result : FAILURE_RESULTS) {
768             outcome = new ControlLoopOperation();
769             oper.setOutcome(params, outcome, result);
770             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
771             assertEquals(result.toString(), result.toString(), outcome.getOutcome());
772         }
773     }
774
775     @Test
776     public void testIsTimeout() {
777         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
778
779         assertFalse(oper.isTimeout(new IllegalStateException()));
780         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
781         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
782         assertFalse(oper.isTimeout(new CompletionException(null)));
783         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
784
785         assertTrue(oper.isTimeout(timex));
786         assertTrue(oper.isTimeout(new CompletionException(timex)));
787     }
788
789     @Test
790     public void testGetTimeOutMillis() {
791         assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy));
792
793         policy.setTimeout(null);
794         assertEquals(0, oper.getTimeOutMillis(policy));
795     }
796
797     private void starter(ControlLoopOperation oper) {
798         ++numStart;
799         tstart = oper.getStart();
800         opstart = oper;
801     }
802
803     private void completer(ControlLoopOperation oper) {
804         ++numEnd;
805         opend = oper;
806     }
807
808     /**
809      * Gets a function that does nothing.
810      *
811      * @param <T> type of input parameter expected by the function
812      * @return a function that does nothing
813      */
814     private <T> Consumer<T> noop() {
815         return unused -> {
816         };
817     }
818
819     /**
820      * Verifies a run.
821      *
822      * @param testName test name
823      * @param expectedCallbacks number of callbacks expected
824      * @param expectedOperations number of operation invocations expected
825      * @param expectedResult expected outcome
826      */
827     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
828                     PolicyResult expectedResult) {
829
830         String expectedSubRequestId =
831                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
832
833         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
834     }
835
836     /**
837      * Verifies a run.
838      *
839      * @param testName test name
840      * @param expectedCallbacks number of callbacks expected
841      * @param expectedOperations number of operation invocations expected
842      * @param expectedResult expected outcome
843      * @param manipulator function to modify the future returned by
844      *        {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
845      *        the tasks in the executor are run
846      */
847     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
848                     Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
849
850         String expectedSubRequestId =
851                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
852
853         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator);
854     }
855
856     /**
857      * Verifies a run.
858      *
859      * @param testName test name
860      * @param expectedCallbacks number of callbacks expected
861      * @param expectedOperations number of operation invocations expected
862      * @param expectedResult expected outcome
863      * @param expectedSubRequestId expected sub request ID
864      * @param manipulator function to modify the future returned by
865      *        {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
866      *        the tasks in the executor are run
867      */
868     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
869                     String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
870
871         CompletableFuture<ControlLoopOperation> future = oper.startOperation(params);
872
873         manipulator.accept(future);
874
875         assertTrue(testName, executor.runAll());
876
877         assertEquals(testName, expectedCallbacks, numStart);
878         assertEquals(testName, expectedCallbacks, numEnd);
879
880         if (expectedCallbacks > 0) {
881             assertNotNull(testName, opstart);
882             assertNotNull(testName, opend);
883             assertEquals(testName, expectedResult.toString(), opend.getOutcome());
884
885             assertSame(testName, tstart, opstart.getStart());
886             assertSame(testName, tstart, opend.getStart());
887
888             try {
889                 assertTrue(future.isDone());
890                 assertSame(testName, opend, future.get());
891
892             } catch (InterruptedException | ExecutionException e) {
893                 throw new IllegalStateException(e);
894             }
895
896             if (expectedOperations > 0) {
897                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
898             }
899         }
900
901         assertEquals(testName, expectedOperations, oper.getCount());
902     }
903
904     private static class MyOper extends OperatorPartial {
905         @Getter
906         private int count = 0;
907
908         @Setter
909         private boolean genException;
910
911         @Setter
912         private int maxFailures = 0;
913
914         @Setter
915         private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor;
916
917         public MyOper() {
918             super(ACTOR, OPERATOR);
919         }
920
921         @Override
922         protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
923                         ControlLoopOperation operation) {
924             ++count;
925             if (genException) {
926                 throw new IllegalStateException(EXPECTED_EXCEPTION);
927             }
928
929             operation.setSubRequestId(String.valueOf(attempt));
930
931             if (count > maxFailures) {
932                 operation.setOutcome(PolicyResult.SUCCESS.toString());
933             } else {
934                 operation.setOutcome(PolicyResult.FAILURE.toString());
935             }
936
937             return operation;
938         }
939
940         @Override
941         protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
942                         ControlLoopOperationParams params) {
943
944             return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params));
945         }
946     }
947
948     /**
949      * Executor that will run tasks until the queue is empty or a maximum number of tasks
950      * have been executed.
951      */
952     private static class MyExec implements Executor {
953         private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
954
955         private Queue<Runnable> commands = new LinkedList<>();
956
957         public MyExec() {
958             // do nothing
959         }
960
961         public int getQueueLength() {
962             return commands.size();
963         }
964
965         @Override
966         public void execute(Runnable command) {
967             commands.add(command);
968         }
969
970         public boolean runAll() {
971             for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
972                 commands.remove().run();
973             }
974
975             return commands.isEmpty();
976         }
977     }
978 }