More actor clean-up
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31
32 import ch.qos.logback.classic.Logger;
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.UUID;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.CompletionException;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ForkJoinPool;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.function.Consumer;
50 import java.util.function.Supplier;
51 import java.util.stream.Collectors;
52 import lombok.Getter;
53 import lombok.Setter;
54 import org.junit.AfterClass;
55 import org.junit.Before;
56 import org.junit.BeforeClass;
57 import org.junit.Test;
58 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
59 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
60 import org.onap.policy.common.utils.coder.Coder;
61 import org.onap.policy.common.utils.coder.CoderException;
62 import org.onap.policy.common.utils.coder.StandardCoder;
63 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
64 import org.onap.policy.common.utils.time.PseudoExecutor;
65 import org.onap.policy.controlloop.ControlLoopOperation;
66 import org.onap.policy.controlloop.VirtualControlLoopEvent;
67 import org.onap.policy.controlloop.actorserviceprovider.Operation;
68 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
69 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
70 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
71 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
72 import org.onap.policy.controlloop.policy.PolicyResult;
73 import org.slf4j.LoggerFactory;
74
75 public class OperationPartialTest {
76     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
77     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
78     private static final int MAX_REQUESTS = 100;
79     private static final int MAX_PARALLEL = 10;
80     private static final String EXPECTED_EXCEPTION = "expected exception";
81     private static final String ACTOR = "my-actor";
82     private static final String OPERATION = "my-operation";
83     private static final String MY_SINK = "my-sink";
84     private static final String MY_SOURCE = "my-source";
85     private static final String TEXT = "my-text";
86     private static final int TIMEOUT = 1000;
87     private static final UUID REQ_ID = UUID.randomUUID();
88
89     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
90                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
91
92     /**
93      * Used to attach an appender to the class' logger.
94      */
95     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
96     private static final ExtractAppender appender = new ExtractAppender();
97
98     private VirtualControlLoopEvent event;
99     private ControlLoopEventContext context;
100     private PseudoExecutor executor;
101     private ControlLoopOperationParams params;
102
103     private MyOper oper;
104
105     private int numStart;
106     private int numEnd;
107
108     private Instant tstart;
109
110     private OperationOutcome opstart;
111     private OperationOutcome opend;
112
113     private OperatorConfig config;
114
115     /**
116      * Attaches the appender to the logger.
117      */
118     @BeforeClass
119     public static void setUpBeforeClass() throws Exception {
120         /**
121          * Attach appender to the logger.
122          */
123         appender.setContext(logger.getLoggerContext());
124         appender.start();
125
126         logger.addAppender(appender);
127     }
128
129     /**
130      * Stops the appender.
131      */
132     @AfterClass
133     public static void tearDownAfterClass() {
134         appender.stop();
135     }
136
137     /**
138      * Initializes the fields, including {@link #oper}.
139      */
140     @Before
141     public void setUp() {
142         event = new VirtualControlLoopEvent();
143         event.setRequestId(REQ_ID);
144
145         context = new ControlLoopEventContext(event);
146         executor = new PseudoExecutor();
147
148         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
149                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
150                         .startCallback(this::starter).targetEntity(MY_SINK).build();
151
152         config = new OperatorConfig(executor);
153
154         oper = new MyOper();
155
156         tstart = null;
157
158         opstart = null;
159         opend = null;
160     }
161
162     @Test
163     public void testOperatorPartial_testGetActorName_testGetName() {
164         assertEquals(ACTOR, oper.getActorName());
165         assertEquals(OPERATION, oper.getName());
166         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
167     }
168
169     @Test
170     public void testGetBlockingThread() throws Exception {
171         CompletableFuture<Void> future = new CompletableFuture<>();
172
173         // use the real executor
174         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
175             @Override
176             public Operation buildOperation(ControlLoopOperationParams params) {
177                 return null;
178             }
179         };
180
181         oper2.getBlockingExecutor().execute(() -> future.complete(null));
182
183         assertNull(future.get(5, TimeUnit.SECONDS));
184     }
185
186     @Test
187     public void testStart() {
188         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
189     }
190
191     /**
192      * Tests startOperation() when the operation has a preprocessor.
193      */
194     @Test
195     public void testStartWithPreprocessor() {
196         AtomicInteger count = new AtomicInteger();
197
198         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
199             count.incrementAndGet();
200             return makeSuccess();
201         }, executor);
202
203         oper.setGuard(preproc);
204
205         verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
206
207         assertEquals(1, count.get());
208     }
209
210     /**
211      * Tests start() with multiple running requests.
212      */
213     @Test
214     public void testStartMultiple() {
215         for (int count = 0; count < MAX_PARALLEL; ++count) {
216             oper.start();
217         }
218
219         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
220
221         assertNotNull(opstart);
222         assertNotNull(opend);
223         assertEquals(PolicyResult.SUCCESS, opend.getResult());
224
225         assertEquals(MAX_PARALLEL, numStart);
226         assertEquals(MAX_PARALLEL, oper.getCount());
227         assertEquals(MAX_PARALLEL, numEnd);
228     }
229
230     /**
231      * Tests startPreprocessor() when the preprocessor returns a failure.
232      */
233     @Test
234     public void testStartPreprocessorFailure() {
235         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
236
237         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
238     }
239
240     /**
241      * Tests startPreprocessor() when the preprocessor throws an exception.
242      */
243     @Test
244     public void testStartPreprocessorException() {
245         // arrange for the preprocessor to throw an exception
246         oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
247
248         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
249     }
250
251     /**
252      * Tests startPreprocessor() when the pipeline is not running.
253      */
254     @Test
255     public void testStartPreprocessorNotRunning() {
256         // arrange for the preprocessor to return success, which will be ignored
257         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
258
259         oper.start().cancel(false);
260         assertTrue(executor.runAll(MAX_REQUESTS));
261
262         assertNull(opstart);
263         assertNull(opend);
264
265         assertEquals(0, numStart);
266         assertEquals(0, oper.getCount());
267         assertEquals(0, numEnd);
268     }
269
270     /**
271      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
272      */
273     @Test
274     public void testStartPreprocessorBuilderException() {
275         oper = new MyOper() {
276             @Override
277             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
278                 throw new IllegalStateException(EXPECTED_EXCEPTION);
279             }
280         };
281
282         assertThatIllegalStateException().isThrownBy(() -> oper.start());
283
284         // should be nothing in the queue
285         assertEquals(0, executor.getQueueLength());
286     }
287
288     @Test
289     public void testStartPreprocessorAsync() {
290         assertNull(oper.startPreprocessorAsync());
291     }
292
293     @Test
294     public void testStartGuardAsync() {
295         assertNull(oper.startGuardAsync());
296     }
297
298     @Test
299     public void testStartOperationAsync() {
300         oper.start();
301         assertTrue(executor.runAll(MAX_REQUESTS));
302
303         assertEquals(1, oper.getCount());
304     }
305
306     @Test
307     public void testIsSuccess() {
308         OperationOutcome outcome = new OperationOutcome();
309
310         outcome.setResult(PolicyResult.SUCCESS);
311         assertTrue(oper.isSuccess(outcome));
312
313         for (PolicyResult failure : FAILURE_RESULTS) {
314             outcome.setResult(failure);
315             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
316         }
317     }
318
319     @Test
320     public void testIsActorFailed() {
321         assertFalse(oper.isActorFailed(null));
322
323         OperationOutcome outcome = params.makeOutcome();
324
325         // incorrect outcome
326         outcome.setResult(PolicyResult.SUCCESS);
327         assertFalse(oper.isActorFailed(outcome));
328
329         outcome.setResult(PolicyResult.FAILURE_RETRIES);
330         assertFalse(oper.isActorFailed(outcome));
331
332         // correct outcome
333         outcome.setResult(PolicyResult.FAILURE);
334
335         // incorrect actor
336         outcome.setActor(MY_SINK);
337         assertFalse(oper.isActorFailed(outcome));
338         outcome.setActor(null);
339         assertFalse(oper.isActorFailed(outcome));
340         outcome.setActor(ACTOR);
341
342         // incorrect operation
343         outcome.setOperation(MY_SINK);
344         assertFalse(oper.isActorFailed(outcome));
345         outcome.setOperation(null);
346         assertFalse(oper.isActorFailed(outcome));
347         outcome.setOperation(OPERATION);
348
349         // correct values
350         assertTrue(oper.isActorFailed(outcome));
351     }
352
353     @Test
354     public void testDoOperation() {
355         /*
356          * Use an operation that doesn't override doOperation().
357          */
358         OperationPartial oper2 = new OperationPartial(params, config) {};
359
360         oper2.start();
361         assertTrue(executor.runAll(MAX_REQUESTS));
362
363         assertNotNull(opend);
364         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
365     }
366
367     @Test
368     public void testTimeout() throws Exception {
369
370         // use a real executor
371         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
372
373         // trigger timeout very quickly
374         oper = new MyOper() {
375             @Override
376             protected long getTimeoutMs(Integer timeoutSec) {
377                 return 1;
378             }
379
380             @Override
381             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
382
383                 OperationOutcome outcome2 = params.makeOutcome();
384                 outcome2.setResult(PolicyResult.SUCCESS);
385
386                 /*
387                  * Create an incomplete future that will timeout after the operation's
388                  * timeout. If it fires before the other timer, then it will return a
389                  * SUCCESS outcome.
390                  */
391                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
392                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
393                                 params.getExecutor());
394
395                 return future;
396             }
397         };
398
399         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
400     }
401
402     /**
403      * Tests retry functions, when the count is set to zero and retries are exhausted.
404      */
405     @Test
406     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
407         params = params.toBuilder().retry(0).build();
408
409         // new params, thus need a new operation
410         oper = new MyOper();
411
412         oper.setMaxFailures(10);
413
414         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
415     }
416
417     /**
418      * Tests retry functions, when the count is null and retries are exhausted.
419      */
420     @Test
421     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
422         params = params.toBuilder().retry(null).build();
423
424         // new params, thus need a new operation
425         oper = new MyOper();
426
427         oper.setMaxFailures(10);
428
429         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
430     }
431
432     /**
433      * Tests retry functions, when retries are exhausted.
434      */
435     @Test
436     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
437         final int maxRetries = 3;
438         params = params.toBuilder().retry(maxRetries).build();
439
440         // new params, thus need a new operation
441         oper = new MyOper();
442
443         oper.setMaxFailures(10);
444
445         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
446                         PolicyResult.FAILURE_RETRIES);
447     }
448
449     /**
450      * Tests retry functions, when a success follows some retries.
451      */
452     @Test
453     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
454         params = params.toBuilder().retry(10).build();
455
456         // new params, thus need a new operation
457         oper = new MyOper();
458
459         final int maxFailures = 3;
460         oper.setMaxFailures(maxFailures);
461
462         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
463                         PolicyResult.SUCCESS);
464     }
465
466     /**
467      * Tests retry functions, when the outcome is {@code null}.
468      */
469     @Test
470     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
471
472         // arrange to return null from doOperation()
473         oper = new MyOper() {
474             @Override
475             protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
476
477                 // update counters
478                 super.doOperation(attempt, operation);
479                 return null;
480             }
481         };
482
483         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
484     }
485
486     @Test
487     public void testSleep() throws Exception {
488         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
489         assertTrue(future.isDone());
490         assertNull(future.get());
491
492         // edge case
493         future = oper.sleep(0, TimeUnit.SECONDS);
494         assertTrue(future.isDone());
495         assertNull(future.get());
496
497         /*
498          * Start a second sleep we can use to check the first while it's running.
499          */
500         tstart = Instant.now();
501         future = oper.sleep(100, TimeUnit.MILLISECONDS);
502
503         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
504
505         // wait for second to complete and verify that the first has not completed
506         future2.get();
507         assertFalse(future.isDone());
508
509         // wait for second to complete
510         future.get();
511
512         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
513         assertTrue(diff >= 99);
514     }
515
516     @Test
517     public void testIsSameOperation() {
518         assertFalse(oper.isSameOperation(null));
519
520         OperationOutcome outcome = params.makeOutcome();
521
522         // wrong actor - should be false
523         outcome.setActor(null);
524         assertFalse(oper.isSameOperation(outcome));
525         outcome.setActor(MY_SINK);
526         assertFalse(oper.isSameOperation(outcome));
527         outcome.setActor(ACTOR);
528
529         // wrong operation - should be null
530         outcome.setOperation(null);
531         assertFalse(oper.isSameOperation(outcome));
532         outcome.setOperation(MY_SINK);
533         assertFalse(oper.isSameOperation(outcome));
534         outcome.setOperation(OPERATION);
535
536         assertTrue(oper.isSameOperation(outcome));
537     }
538
539     /**
540      * Tests handleFailure() when the outcome is a success.
541      */
542     @Test
543     public void testHandlePreprocessorFailureTrue() {
544         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
545         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
546     }
547
548     /**
549      * Tests handleFailure() when the outcome is <i>not</i> a success.
550      */
551     @Test
552     public void testHandlePreprocessorFailureFalse() throws Exception {
553         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
554         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
555     }
556
557     /**
558      * Tests handleFailure() when the outcome is {@code null}.
559      */
560     @Test
561     public void testHandlePreprocessorFailureNull() throws Exception {
562         // arrange to return null from the preprocessor
563         oper.setGuard(CompletableFuture.completedFuture(null));
564
565         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
566     }
567
568     @Test
569     public void testFromException() {
570         // arrange to generate an exception when operation runs
571         oper.setGenException(true);
572
573         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
574     }
575
576     /**
577      * Tests fromException() when there is no exception.
578      */
579     @Test
580     public void testFromExceptionNoExcept() {
581         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
582     }
583
584     /**
585      * Tests both flavors of anyOf(), because one invokes the other.
586      */
587     @Test
588     public void testAnyOf() throws Exception {
589         // first task completes, others do not
590         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
591
592         final OperationOutcome outcome = params.makeOutcome();
593
594         tasks.add(() -> CompletableFuture.completedFuture(outcome));
595         tasks.add(() -> new CompletableFuture<>());
596         tasks.add(() -> null);
597         tasks.add(() -> new CompletableFuture<>());
598
599         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
600         assertTrue(executor.runAll(MAX_REQUESTS));
601         assertTrue(result.isDone());
602         assertSame(outcome, result.get());
603
604         // repeat using array form
605         @SuppressWarnings("unchecked")
606         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
607         result = oper.anyOf(tasks.toArray(taskArray));
608         assertTrue(executor.runAll(MAX_REQUESTS));
609         assertTrue(result.isDone());
610         assertSame(outcome, result.get());
611
612         // second task completes, others do not
613         tasks.clear();
614         tasks.add(() -> new CompletableFuture<>());
615         tasks.add(() -> CompletableFuture.completedFuture(outcome));
616         tasks.add(() -> new CompletableFuture<>());
617
618         result = oper.anyOf(tasks);
619         assertTrue(executor.runAll(MAX_REQUESTS));
620         assertTrue(result.isDone());
621         assertSame(outcome, result.get());
622
623         // third task completes, others do not
624         tasks.clear();
625         tasks.add(() -> new CompletableFuture<>());
626         tasks.add(() -> new CompletableFuture<>());
627         tasks.add(() -> CompletableFuture.completedFuture(outcome));
628
629         result = oper.anyOf(tasks);
630         assertTrue(executor.runAll(MAX_REQUESTS));
631         assertTrue(result.isDone());
632         assertSame(outcome, result.get());
633     }
634
635     /**
636      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
637      */
638     @Test
639     @SuppressWarnings("unchecked")
640     public void testAnyOfEdge() throws Exception {
641         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
642
643         // zero items: check both using a list and using an array
644         assertNull(oper.anyOf(tasks));
645         assertNull(oper.anyOf());
646
647         // one item: : check both using a list and using an array
648         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
649         tasks.add(() -> future1);
650
651         assertSame(future1, oper.anyOf(tasks));
652         assertSame(future1, oper.anyOf(() -> future1));
653     }
654
655     @Test
656     public void testAllOfArray() throws Exception {
657         final OperationOutcome outcome = params.makeOutcome();
658
659         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
660         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
661         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
662
663         @SuppressWarnings("unchecked")
664         CompletableFuture<OperationOutcome> result =
665                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
666
667         assertTrue(executor.runAll(MAX_REQUESTS));
668         assertFalse(result.isDone());
669         future1.complete(outcome);
670
671         // complete 3 before 2
672         assertTrue(executor.runAll(MAX_REQUESTS));
673         assertFalse(result.isDone());
674         future3.complete(outcome);
675
676         assertTrue(executor.runAll(MAX_REQUESTS));
677         assertFalse(result.isDone());
678         future2.complete(outcome);
679
680         // all of them are now done
681         assertTrue(executor.runAll(MAX_REQUESTS));
682         assertTrue(result.isDone());
683         assertSame(outcome, result.get());
684     }
685
686     @Test
687     public void testAllOfList() throws Exception {
688         final OperationOutcome outcome = params.makeOutcome();
689
690         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
691         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
692         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
693
694         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
695         tasks.add(() -> future1);
696         tasks.add(() -> future2);
697         tasks.add(() -> null);
698         tasks.add(() -> future3);
699
700         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
701
702         assertTrue(executor.runAll(MAX_REQUESTS));
703         assertFalse(result.isDone());
704         future1.complete(outcome);
705
706         // complete 3 before 2
707         assertTrue(executor.runAll(MAX_REQUESTS));
708         assertFalse(result.isDone());
709         future3.complete(outcome);
710
711         assertTrue(executor.runAll(MAX_REQUESTS));
712         assertFalse(result.isDone());
713         future2.complete(outcome);
714
715         // all of them are now done
716         assertTrue(executor.runAll(MAX_REQUESTS));
717         assertTrue(result.isDone());
718         assertSame(outcome, result.get());
719     }
720
721     /**
722      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
723      */
724     @Test
725     @SuppressWarnings("unchecked")
726     public void testAllOfEdge() throws Exception {
727         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
728
729         // zero items: check both using a list and using an array
730         assertNull(oper.allOf(tasks));
731         assertNull(oper.allOf());
732
733         // one item: : check both using a list and using an array
734         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
735         tasks.add(() -> future1);
736
737         assertSame(future1, oper.allOf(tasks));
738         assertSame(future1, oper.allOf(() -> future1));
739     }
740
741     @Test
742     public void testAttachFutures() throws Exception {
743         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
744
745         // third task throws an exception during construction
746         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
747         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
748         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
749         tasks.add(() -> future1);
750         tasks.add(() -> future2);
751         tasks.add(() -> {
752             throw new IllegalStateException(EXPECTED_EXCEPTION);
753         });
754         tasks.add(() -> future3);
755
756         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
757
758         // should have canceled the first two, but not the last
759         assertTrue(future1.isCancelled());
760         assertTrue(future2.isCancelled());
761         assertFalse(future3.isCancelled());
762     }
763
764     @Test
765     public void testCombineOutcomes() throws Exception {
766         // only one outcome
767         verifyOutcomes(0, PolicyResult.SUCCESS);
768         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
769
770         // maximum is in different positions
771         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
772         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
773         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
774
775         // null outcome - takes precedence over a success
776         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
777         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
778         tasks.add(() -> CompletableFuture.completedFuture(null));
779         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
780         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
781
782         assertTrue(executor.runAll(MAX_REQUESTS));
783         assertTrue(result.isDone());
784         assertNull(result.get());
785
786         // one throws an exception during execution
787         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
788
789         tasks.clear();
790         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
791         tasks.add(() -> CompletableFuture.failedFuture(except));
792         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
793         result = oper.allOf(tasks);
794
795         assertTrue(executor.runAll(MAX_REQUESTS));
796         assertTrue(result.isCompletedExceptionally());
797         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
798     }
799
800     /**
801      * Tests both flavors of sequence(), because one invokes the other.
802      */
803     @Test
804     public void testSequence() throws Exception {
805         final OperationOutcome outcome = params.makeOutcome();
806
807         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
808         tasks.add(() -> CompletableFuture.completedFuture(outcome));
809         tasks.add(() -> null);
810         tasks.add(() -> CompletableFuture.completedFuture(outcome));
811         tasks.add(() -> CompletableFuture.completedFuture(outcome));
812
813         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
814         assertTrue(executor.runAll(MAX_REQUESTS));
815         assertTrue(result.isDone());
816         assertSame(outcome, result.get());
817
818         // repeat using array form
819         @SuppressWarnings("unchecked")
820         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
821         result = oper.sequence(tasks.toArray(taskArray));
822         assertTrue(executor.runAll(MAX_REQUESTS));
823         assertTrue(result.isDone());
824         assertSame(outcome, result.get());
825
826         // second task fails, third should not run
827         OperationOutcome failure = params.makeOutcome();
828         failure.setResult(PolicyResult.FAILURE);
829         tasks.clear();
830         tasks.add(() -> CompletableFuture.completedFuture(outcome));
831         tasks.add(() -> CompletableFuture.completedFuture(failure));
832         tasks.add(() -> CompletableFuture.completedFuture(outcome));
833
834         result = oper.sequence(tasks);
835         assertTrue(executor.runAll(MAX_REQUESTS));
836         assertTrue(result.isDone());
837         assertSame(failure, result.get());
838     }
839
840     /**
841      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
842      */
843     @Test
844     @SuppressWarnings("unchecked")
845     public void testSequenceEdge() throws Exception {
846         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
847
848         // zero items: check both using a list and using an array
849         assertNull(oper.sequence(tasks));
850         assertNull(oper.sequence());
851
852         // one item: : check both using a list and using an array
853         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
854         tasks.add(() -> future1);
855
856         assertSame(future1, oper.sequence(tasks));
857         assertSame(future1, oper.sequence(() -> future1));
858     }
859
860     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
861         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
862
863         OperationOutcome expectedOutcome = null;
864
865         for (int count = 0; count < results.length; ++count) {
866             OperationOutcome outcome = params.makeOutcome();
867             outcome.setResult(results[count]);
868             tasks.add(() -> CompletableFuture.completedFuture(outcome));
869
870             if (count == expected) {
871                 expectedOutcome = outcome;
872             }
873         }
874
875         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
876
877         assertTrue(executor.runAll(MAX_REQUESTS));
878         assertTrue(result.isDone());
879         assertSame(expectedOutcome, result.get());
880     }
881
882     @Test
883     public void testDetmPriority() throws CoderException {
884         assertEquals(1, oper.detmPriority(null));
885
886         OperationOutcome outcome = params.makeOutcome();
887
888         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
889                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
890                         PolicyResult.FAILURE_EXCEPTION, 6);
891
892         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
893             outcome.setResult(ent.getKey());
894             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
895         }
896
897         /*
898          * Test null result. We can't actually set it to null, because the set() method
899          * won't allow it. Instead, we decode it from a structure.
900          */
901         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
902         assertEquals(1, oper.detmPriority(outcome));
903     }
904
905     /**
906      * Tests callbackStarted() when the pipeline has already been stopped.
907      */
908     @Test
909     public void testCallbackStartedNotRunning() {
910         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
911
912         /*
913          * arrange to stop the controller when the start-callback is invoked, but capture
914          * the outcome
915          */
916         params = params.toBuilder().startCallback(oper -> {
917             starter(oper);
918             future.get().cancel(false);
919         }).build();
920
921         // new params, thus need a new operation
922         oper = new MyOper();
923
924         future.set(oper.start());
925         assertTrue(executor.runAll(MAX_REQUESTS));
926
927         // should have only run once
928         assertEquals(1, numStart);
929     }
930
931     /**
932      * Tests callbackCompleted() when the pipeline has already been stopped.
933      */
934     @Test
935     public void testCallbackCompletedNotRunning() {
936         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
937
938         // arrange to stop the controller when the start-callback is invoked
939         params = params.toBuilder().startCallback(oper -> {
940             future.get().cancel(false);
941         }).build();
942
943         // new params, thus need a new operation
944         oper = new MyOper();
945
946         future.set(oper.start());
947         assertTrue(executor.runAll(MAX_REQUESTS));
948
949         // should not have been set
950         assertNull(opend);
951         assertEquals(0, numEnd);
952     }
953
954     @Test
955     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
956         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
957
958         OperationOutcome outcome;
959
960         outcome = new OperationOutcome();
961         oper.setOutcome(outcome, timex);
962         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
963         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
964
965         outcome = new OperationOutcome();
966         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
967         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
968         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
969     }
970
971     @Test
972     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
973         OperationOutcome outcome;
974
975         outcome = new OperationOutcome();
976         oper.setOutcome(outcome, PolicyResult.SUCCESS);
977         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
978         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
979
980         for (PolicyResult result : FAILURE_RESULTS) {
981             outcome = new OperationOutcome();
982             oper.setOutcome(outcome, result);
983             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
984             assertEquals(result.toString(), result, outcome.getResult());
985         }
986     }
987
988     @Test
989     public void testIsTimeout() {
990         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
991
992         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
993         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
994         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
995         assertFalse(oper.isTimeout(new CompletionException(null)));
996         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
997
998         assertTrue(oper.isTimeout(timex));
999         assertTrue(oper.isTimeout(new CompletionException(timex)));
1000     }
1001
1002     @Test
1003     public void testLogMessage() {
1004         final String infraStr = SINK_INFRA.toString();
1005
1006         // log structured data
1007         appender.clearExtractions();
1008         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1009         List<String> output = appender.getExtracted();
1010         assertEquals(1, output.size());
1011
1012         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1013                         .contains("{\n  \"text\": \"my-text\"\n}");
1014
1015         // repeat with a response
1016         appender.clearExtractions();
1017         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1018         output = appender.getExtracted();
1019         assertEquals(1, output.size());
1020
1021         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1022                         .contains("{\n  \"text\": \"my-text\"\n}");
1023
1024         // log a plain string
1025         appender.clearExtractions();
1026         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1027         output = appender.getExtracted();
1028         assertEquals(1, output.size());
1029         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1030
1031         // log a null request
1032         appender.clearExtractions();
1033         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1034         output = appender.getExtracted();
1035         assertEquals(1, output.size());
1036
1037         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1038
1039         // generate exception from coder
1040         setOperCoderException();
1041
1042         appender.clearExtractions();
1043         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1044         output = appender.getExtracted();
1045         assertEquals(2, output.size());
1046         assertThat(output.get(0)).contains("cannot pretty-print request");
1047         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1048
1049         // repeat with a response
1050         appender.clearExtractions();
1051         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1052         output = appender.getExtracted();
1053         assertEquals(2, output.size());
1054         assertThat(output.get(0)).contains("cannot pretty-print response");
1055         assertThat(output.get(1)).contains(MY_SOURCE);
1056     }
1057
1058     @Test
1059     public void testGetRetry() {
1060         assertEquals(0, oper.getRetry(null));
1061         assertEquals(10, oper.getRetry(10));
1062     }
1063
1064     @Test
1065     public void testGetRetryWait() {
1066         // need an operator that doesn't override the retry time
1067         OperationPartial oper2 = new OperationPartial(params, config) {};
1068         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1069     }
1070
1071     @Test
1072     public void testGetTimeOutMs() {
1073         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1074
1075         params = params.toBuilder().timeoutSec(null).build();
1076
1077         // new params, thus need a new operation
1078         oper = new MyOper();
1079
1080         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1081     }
1082
1083     private void starter(OperationOutcome oper) {
1084         ++numStart;
1085         tstart = oper.getStart();
1086         opstart = oper;
1087     }
1088
1089     private void completer(OperationOutcome oper) {
1090         ++numEnd;
1091         opend = oper;
1092     }
1093
1094     /**
1095      * Gets a function that does nothing.
1096      *
1097      * @param <T> type of input parameter expected by the function
1098      * @return a function that does nothing
1099      */
1100     private <T> Consumer<T> noop() {
1101         return unused -> {
1102         };
1103     }
1104
1105     private OperationOutcome makeSuccess() {
1106         OperationOutcome outcome = params.makeOutcome();
1107         outcome.setResult(PolicyResult.SUCCESS);
1108
1109         return outcome;
1110     }
1111
1112     private OperationOutcome makeFailure() {
1113         OperationOutcome outcome = params.makeOutcome();
1114         outcome.setResult(PolicyResult.FAILURE);
1115
1116         return outcome;
1117     }
1118
1119     /**
1120      * Verifies a run.
1121      *
1122      * @param testName test name
1123      * @param expectedCallbacks number of callbacks expected
1124      * @param expectedOperations number of operation invocations expected
1125      * @param expectedResult expected outcome
1126      */
1127     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1128                     PolicyResult expectedResult) {
1129
1130         String expectedSubRequestId =
1131                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1132
1133         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1134     }
1135
1136     /**
1137      * Verifies a run.
1138      *
1139      * @param testName test name
1140      * @param expectedCallbacks number of callbacks expected
1141      * @param expectedOperations number of operation invocations expected
1142      * @param expectedResult expected outcome
1143      * @param expectedSubRequestId expected sub request ID
1144      * @param manipulator function to modify the future returned by
1145      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1146      *        in the executor are run
1147      */
1148     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1149                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1150
1151         CompletableFuture<OperationOutcome> future = oper.start();
1152
1153         manipulator.accept(future);
1154
1155         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1156
1157         assertEquals(testName, expectedCallbacks, numStart);
1158         assertEquals(testName, expectedCallbacks, numEnd);
1159
1160         if (expectedCallbacks > 0) {
1161             assertNotNull(testName, opstart);
1162             assertNotNull(testName, opend);
1163             assertEquals(testName, expectedResult, opend.getResult());
1164
1165             assertSame(testName, tstart, opstart.getStart());
1166             assertSame(testName, tstart, opend.getStart());
1167
1168             try {
1169                 assertTrue(future.isDone());
1170                 assertSame(testName, opend, future.get());
1171
1172             } catch (InterruptedException | ExecutionException e) {
1173                 throw new IllegalStateException(e);
1174             }
1175
1176             if (expectedOperations > 0) {
1177                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1178             }
1179         }
1180
1181         assertEquals(testName, expectedOperations, oper.getCount());
1182     }
1183
1184     /**
1185      * Creates a new {@link #oper} whose coder will throw an exception.
1186      */
1187     private void setOperCoderException() {
1188         oper = new MyOper() {
1189             @Override
1190             protected Coder makeCoder() {
1191                 return new StandardCoder() {
1192                     @Override
1193                     public String encode(Object object, boolean pretty) throws CoderException {
1194                         throw new CoderException(EXPECTED_EXCEPTION);
1195                     }
1196                 };
1197             }
1198         };
1199     }
1200
1201
1202     @Getter
1203     public static class MyData {
1204         private String text = TEXT;
1205     }
1206
1207
1208     private class MyOper extends OperationPartial {
1209         @Getter
1210         private int count = 0;
1211
1212         @Setter
1213         private boolean genException;
1214
1215         @Setter
1216         private int maxFailures = 0;
1217
1218         @Setter
1219         private CompletableFuture<OperationOutcome> guard;
1220
1221
1222         public MyOper() {
1223             super(OperationPartialTest.this.params, config);
1224         }
1225
1226         @Override
1227         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1228             ++count;
1229             if (genException) {
1230                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1231             }
1232
1233             operation.setSubRequestId(String.valueOf(attempt));
1234
1235             if (count > maxFailures) {
1236                 operation.setResult(PolicyResult.SUCCESS);
1237             } else {
1238                 operation.setResult(PolicyResult.FAILURE);
1239             }
1240
1241             return operation;
1242         }
1243
1244         @Override
1245         protected CompletableFuture<OperationOutcome> startGuardAsync() {
1246             return (guard != null ? guard : super.startGuardAsync());
1247         }
1248
1249         @Override
1250         protected long getRetryWaitMs() {
1251             /*
1252              * Sleep timers run in the background, but we want to control things via the
1253              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1254              */
1255             return 0L;
1256         }
1257     }
1258 }