Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32
33 import ch.qos.logback.classic.Logger;
34 import java.time.Instant;
35 import java.util.Arrays;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.UUID;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.CompletionException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.ForkJoinPool;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import java.util.stream.Collectors;
54 import lombok.Getter;
55 import lombok.Setter;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
61 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
62 import org.onap.policy.common.utils.coder.Coder;
63 import org.onap.policy.common.utils.coder.CoderException;
64 import org.onap.policy.common.utils.coder.StandardCoder;
65 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
66 import org.onap.policy.common.utils.time.PseudoExecutor;
67 import org.onap.policy.controlloop.ControlLoopOperation;
68 import org.onap.policy.controlloop.VirtualControlLoopEvent;
69 import org.onap.policy.controlloop.actorserviceprovider.Operation;
70 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
71 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
72 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
73 import org.onap.policy.controlloop.policy.PolicyResult;
74 import org.slf4j.LoggerFactory;
75
76 public class OperationPartialTest {
77     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
78     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
79     private static final int MAX_REQUESTS = 100;
80     private static final int MAX_PARALLEL = 10;
81     private static final String EXPECTED_EXCEPTION = "expected exception";
82     private static final String ACTOR = "my-actor";
83     private static final String OPERATION = "my-operation";
84     private static final String MY_SINK = "my-sink";
85     private static final String MY_SOURCE = "my-source";
86     private static final String TEXT = "my-text";
87     private static final int TIMEOUT = 1000;
88     private static final UUID REQ_ID = UUID.randomUUID();
89
90     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
91                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
92
93     /**
94      * Used to attach an appender to the class' logger.
95      */
96     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
97     private static final ExtractAppender appender = new ExtractAppender();
98
99     private VirtualControlLoopEvent event;
100     private ControlLoopEventContext context;
101     private PseudoExecutor executor;
102     private ControlLoopOperationParams params;
103
104     private MyOper oper;
105
106     private int numStart;
107     private int numEnd;
108
109     private Instant tstart;
110
111     private OperationOutcome opstart;
112     private OperationOutcome opend;
113
114     private OperatorPartial operator;
115
116     /**
117      * Attaches the appender to the logger.
118      */
119     @BeforeClass
120     public static void setUpBeforeClass() throws Exception {
121         /**
122          * Attach appender to the logger.
123          */
124         appender.setContext(logger.getLoggerContext());
125         appender.start();
126
127         logger.addAppender(appender);
128     }
129
130     /**
131      * Stops the appender.
132      */
133     @AfterClass
134     public static void tearDownAfterClass() {
135         appender.stop();
136     }
137
138     /**
139      * Initializes the fields, including {@link #oper}.
140      */
141     @Before
142     public void setUp() {
143         event = new VirtualControlLoopEvent();
144         event.setRequestId(REQ_ID);
145
146         context = new ControlLoopEventContext(event);
147         executor = new PseudoExecutor();
148
149         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
150                         .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
151                         .startCallback(this::starter).targetEntity(MY_SINK).build();
152
153         operator = new OperatorPartial(ACTOR, OPERATION) {
154             @Override
155             public Executor getBlockingExecutor() {
156                 return executor;
157             }
158
159             @Override
160             public Operation buildOperation(ControlLoopOperationParams params) {
161                 return null;
162             }
163         };
164
165         operator.configure(null);
166         operator.start();
167
168         oper = new MyOper();
169
170         tstart = null;
171
172         opstart = null;
173         opend = null;
174     }
175
176     @Test
177     public void testOperatorPartial_testGetActorName_testGetName() {
178         assertEquals(ACTOR, oper.getActorName());
179         assertEquals(OPERATION, oper.getName());
180         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
181     }
182
183     @Test
184     public void testGetBlockingThread() throws Exception {
185         CompletableFuture<Void> future = new CompletableFuture<>();
186
187         // use the real executor
188         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
189             @Override
190             public Operation buildOperation(ControlLoopOperationParams params) {
191                 return null;
192             }
193         };
194
195         oper2.getBlockingExecutor().execute(() -> future.complete(null));
196
197         assertNull(future.get(5, TimeUnit.SECONDS));
198     }
199
200     /**
201      * Exercises the doXxx() methods.
202      */
203     @Test
204     public void testDoXxx() {
205         assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
206         assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
207         assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
208         assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
209
210     }
211
212     @Test
213     public void testStart() {
214         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
215     }
216
217     /**
218      * Tests startOperation() when the operator is not running.
219      */
220     @Test
221     public void testStartNotRunning() {
222         // stop the operator
223         operator.stop();
224
225         assertThatIllegalStateException().isThrownBy(() -> oper.start());
226     }
227
228     /**
229      * Tests startOperation() when the operation has a preprocessor.
230      */
231     @Test
232     public void testStartWithPreprocessor() {
233         AtomicInteger count = new AtomicInteger();
234
235         CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
236             count.incrementAndGet();
237             return makeSuccess();
238         }, executor);
239
240         oper.setGuard(preproc);
241
242         verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
243
244         assertEquals(1, count.get());
245     }
246
247     /**
248      * Tests start() with multiple running requests.
249      */
250     @Test
251     public void testStartMultiple() {
252         for (int count = 0; count < MAX_PARALLEL; ++count) {
253             oper.start();
254         }
255
256         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
257
258         assertNotNull(opstart);
259         assertNotNull(opend);
260         assertEquals(PolicyResult.SUCCESS, opend.getResult());
261
262         assertEquals(MAX_PARALLEL, numStart);
263         assertEquals(MAX_PARALLEL, oper.getCount());
264         assertEquals(MAX_PARALLEL, numEnd);
265     }
266
267     /**
268      * Tests startPreprocessor() when the preprocessor returns a failure.
269      */
270     @Test
271     public void testStartPreprocessorFailure() {
272         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
273
274         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
275     }
276
277     /**
278      * Tests startPreprocessor() when the preprocessor throws an exception.
279      */
280     @Test
281     public void testStartPreprocessorException() {
282         // arrange for the preprocessor to throw an exception
283         oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
284
285         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
286     }
287
288     /**
289      * Tests startPreprocessor() when the pipeline is not running.
290      */
291     @Test
292     public void testStartPreprocessorNotRunning() {
293         // arrange for the preprocessor to return success, which will be ignored
294         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
295
296         oper.start().cancel(false);
297         assertTrue(executor.runAll(MAX_REQUESTS));
298
299         assertNull(opstart);
300         assertNull(opend);
301
302         assertEquals(0, numStart);
303         assertEquals(0, oper.getCount());
304         assertEquals(0, numEnd);
305     }
306
307     /**
308      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
309      */
310     @Test
311     public void testStartPreprocessorBuilderException() {
312         oper = new MyOper() {
313             @Override
314             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
315                 throw new IllegalStateException(EXPECTED_EXCEPTION);
316             }
317         };
318
319         assertThatIllegalStateException().isThrownBy(() -> oper.start());
320
321         // should be nothing in the queue
322         assertEquals(0, executor.getQueueLength());
323     }
324
325     @Test
326     public void testStartPreprocessorAsync() {
327         assertNull(oper.startPreprocessorAsync());
328     }
329
330     @Test
331     public void testStartGuardAsync() {
332         assertNull(oper.startGuardAsync());
333     }
334
335     @Test
336     public void testStartOperationAsync() {
337         oper.start();
338         assertTrue(executor.runAll(MAX_REQUESTS));
339
340         assertEquals(1, oper.getCount());
341     }
342
343     @Test
344     public void testIsSuccess() {
345         OperationOutcome outcome = new OperationOutcome();
346
347         outcome.setResult(PolicyResult.SUCCESS);
348         assertTrue(oper.isSuccess(outcome));
349
350         for (PolicyResult failure : FAILURE_RESULTS) {
351             outcome.setResult(failure);
352             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
353         }
354     }
355
356     @Test
357     public void testIsActorFailed() {
358         assertFalse(oper.isActorFailed(null));
359
360         OperationOutcome outcome = params.makeOutcome();
361
362         // incorrect outcome
363         outcome.setResult(PolicyResult.SUCCESS);
364         assertFalse(oper.isActorFailed(outcome));
365
366         outcome.setResult(PolicyResult.FAILURE_RETRIES);
367         assertFalse(oper.isActorFailed(outcome));
368
369         // correct outcome
370         outcome.setResult(PolicyResult.FAILURE);
371
372         // incorrect actor
373         outcome.setActor(MY_SINK);
374         assertFalse(oper.isActorFailed(outcome));
375         outcome.setActor(null);
376         assertFalse(oper.isActorFailed(outcome));
377         outcome.setActor(ACTOR);
378
379         // incorrect operation
380         outcome.setOperation(MY_SINK);
381         assertFalse(oper.isActorFailed(outcome));
382         outcome.setOperation(null);
383         assertFalse(oper.isActorFailed(outcome));
384         outcome.setOperation(OPERATION);
385
386         // correct values
387         assertTrue(oper.isActorFailed(outcome));
388     }
389
390     @Test
391     public void testDoOperation() {
392         /*
393          * Use an operation that doesn't override doOperation().
394          */
395         OperationPartial oper2 = new OperationPartial(params, operator) {};
396
397         oper2.start();
398         assertTrue(executor.runAll(MAX_REQUESTS));
399
400         assertNotNull(opend);
401         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
402     }
403
404     @Test
405     public void testTimeout() throws Exception {
406
407         // use a real executor
408         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
409
410         // trigger timeout very quickly
411         oper = new MyOper() {
412             @Override
413             protected long getTimeoutMs(Integer timeoutSec) {
414                 return 1;
415             }
416
417             @Override
418             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
419
420                 OperationOutcome outcome2 = params.makeOutcome();
421                 outcome2.setResult(PolicyResult.SUCCESS);
422
423                 /*
424                  * Create an incomplete future that will timeout after the operation's
425                  * timeout. If it fires before the other timer, then it will return a
426                  * SUCCESS outcome.
427                  */
428                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
429                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
430                                 params.getExecutor());
431
432                 return future;
433             }
434         };
435
436         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
437     }
438
439     /**
440      * Tests retry functions, when the count is set to zero and retries are exhausted.
441      */
442     @Test
443     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
444         params = params.toBuilder().retry(0).build();
445
446         // new params, thus need a new operation
447         oper = new MyOper();
448
449         oper.setMaxFailures(10);
450
451         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
452     }
453
454     /**
455      * Tests retry functions, when the count is null and retries are exhausted.
456      */
457     @Test
458     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
459         params = params.toBuilder().retry(null).build();
460
461         // new params, thus need a new operation
462         oper = new MyOper();
463
464         oper.setMaxFailures(10);
465
466         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
467     }
468
469     /**
470      * Tests retry functions, when retries are exhausted.
471      */
472     @Test
473     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
474         final int maxRetries = 3;
475         params = params.toBuilder().retry(maxRetries).build();
476
477         // new params, thus need a new operation
478         oper = new MyOper();
479
480         oper.setMaxFailures(10);
481
482         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
483                         PolicyResult.FAILURE_RETRIES);
484     }
485
486     /**
487      * Tests retry functions, when a success follows some retries.
488      */
489     @Test
490     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
491         params = params.toBuilder().retry(10).build();
492
493         // new params, thus need a new operation
494         oper = new MyOper();
495
496         final int maxFailures = 3;
497         oper.setMaxFailures(maxFailures);
498
499         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
500                         PolicyResult.SUCCESS);
501     }
502
503     /**
504      * Tests retry functions, when the outcome is {@code null}.
505      */
506     @Test
507     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
508
509         // arrange to return null from doOperation()
510         oper = new MyOper() {
511             @Override
512             protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
513
514                 // update counters
515                 super.doOperation(attempt, operation);
516                 return null;
517             }
518         };
519
520         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
521     }
522
523     @Test
524     public void testSleep() throws Exception {
525         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
526         assertTrue(future.isDone());
527         assertNull(future.get());
528
529         // edge case
530         future = oper.sleep(0, TimeUnit.SECONDS);
531         assertTrue(future.isDone());
532         assertNull(future.get());
533
534         /*
535          * Start a second sleep we can use to check the first while it's running.
536          */
537         tstart = Instant.now();
538         future = oper.sleep(100, TimeUnit.MILLISECONDS);
539
540         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
541
542         // wait for second to complete and verify that the first has not completed
543         future2.get();
544         assertFalse(future.isDone());
545
546         // wait for second to complete
547         future.get();
548
549         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
550         assertTrue(diff >= 99);
551     }
552
553     @Test
554     public void testIsSameOperation() {
555         assertFalse(oper.isSameOperation(null));
556
557         OperationOutcome outcome = params.makeOutcome();
558
559         // wrong actor - should be false
560         outcome.setActor(null);
561         assertFalse(oper.isSameOperation(outcome));
562         outcome.setActor(MY_SINK);
563         assertFalse(oper.isSameOperation(outcome));
564         outcome.setActor(ACTOR);
565
566         // wrong operation - should be null
567         outcome.setOperation(null);
568         assertFalse(oper.isSameOperation(outcome));
569         outcome.setOperation(MY_SINK);
570         assertFalse(oper.isSameOperation(outcome));
571         outcome.setOperation(OPERATION);
572
573         assertTrue(oper.isSameOperation(outcome));
574     }
575
576     /**
577      * Tests handleFailure() when the outcome is a success.
578      */
579     @Test
580     public void testHandlePreprocessorFailureTrue() {
581         oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
582         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
583     }
584
585     /**
586      * Tests handleFailure() when the outcome is <i>not</i> a success.
587      */
588     @Test
589     public void testHandlePreprocessorFailureFalse() throws Exception {
590         oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
591         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
592     }
593
594     /**
595      * Tests handleFailure() when the outcome is {@code null}.
596      */
597     @Test
598     public void testHandlePreprocessorFailureNull() throws Exception {
599         // arrange to return null from the preprocessor
600         oper.setGuard(CompletableFuture.completedFuture(null));
601
602         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
603     }
604
605     @Test
606     public void testFromException() {
607         // arrange to generate an exception when operation runs
608         oper.setGenException(true);
609
610         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
611     }
612
613     /**
614      * Tests fromException() when there is no exception.
615      */
616     @Test
617     public void testFromExceptionNoExcept() {
618         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
619     }
620
621     /**
622      * Tests both flavors of anyOf(), because one invokes the other.
623      */
624     @Test
625     public void testAnyOf() throws Exception {
626         // first task completes, others do not
627         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
628
629         final OperationOutcome outcome = params.makeOutcome();
630
631         tasks.add(() -> CompletableFuture.completedFuture(outcome));
632         tasks.add(() -> new CompletableFuture<>());
633         tasks.add(() -> null);
634         tasks.add(() -> new CompletableFuture<>());
635
636         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
637         assertTrue(executor.runAll(MAX_REQUESTS));
638         assertTrue(result.isDone());
639         assertSame(outcome, result.get());
640
641         // repeat using array form
642         @SuppressWarnings("unchecked")
643         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
644         result = oper.anyOf(tasks.toArray(taskArray));
645         assertTrue(executor.runAll(MAX_REQUESTS));
646         assertTrue(result.isDone());
647         assertSame(outcome, result.get());
648
649         // second task completes, others do not
650         tasks.clear();
651         tasks.add(() -> new CompletableFuture<>());
652         tasks.add(() -> CompletableFuture.completedFuture(outcome));
653         tasks.add(() -> new CompletableFuture<>());
654
655         result = oper.anyOf(tasks);
656         assertTrue(executor.runAll(MAX_REQUESTS));
657         assertTrue(result.isDone());
658         assertSame(outcome, result.get());
659
660         // third task completes, others do not
661         tasks.clear();
662         tasks.add(() -> new CompletableFuture<>());
663         tasks.add(() -> new CompletableFuture<>());
664         tasks.add(() -> CompletableFuture.completedFuture(outcome));
665
666         result = oper.anyOf(tasks);
667         assertTrue(executor.runAll(MAX_REQUESTS));
668         assertTrue(result.isDone());
669         assertSame(outcome, result.get());
670     }
671
672     /**
673      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
674      */
675     @Test
676     @SuppressWarnings("unchecked")
677     public void testAnyOfEdge() throws Exception {
678         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
679
680         // zero items: check both using a list and using an array
681         assertNull(oper.anyOf(tasks));
682         assertNull(oper.anyOf());
683
684         // one item: : check both using a list and using an array
685         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
686         tasks.add(() -> future1);
687
688         assertSame(future1, oper.anyOf(tasks));
689         assertSame(future1, oper.anyOf(() -> future1));
690     }
691
692     @Test
693     public void testAllOfArray() throws Exception {
694         final OperationOutcome outcome = params.makeOutcome();
695
696         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
697         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
698         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
699
700         @SuppressWarnings("unchecked")
701         CompletableFuture<OperationOutcome> result =
702                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
703
704         assertTrue(executor.runAll(MAX_REQUESTS));
705         assertFalse(result.isDone());
706         future1.complete(outcome);
707
708         // complete 3 before 2
709         assertTrue(executor.runAll(MAX_REQUESTS));
710         assertFalse(result.isDone());
711         future3.complete(outcome);
712
713         assertTrue(executor.runAll(MAX_REQUESTS));
714         assertFalse(result.isDone());
715         future2.complete(outcome);
716
717         // all of them are now done
718         assertTrue(executor.runAll(MAX_REQUESTS));
719         assertTrue(result.isDone());
720         assertSame(outcome, result.get());
721     }
722
723     @Test
724     public void testAllOfList() throws Exception {
725         final OperationOutcome outcome = params.makeOutcome();
726
727         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
728         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
729         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
730
731         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
732         tasks.add(() -> future1);
733         tasks.add(() -> future2);
734         tasks.add(() -> null);
735         tasks.add(() -> future3);
736
737         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
738
739         assertTrue(executor.runAll(MAX_REQUESTS));
740         assertFalse(result.isDone());
741         future1.complete(outcome);
742
743         // complete 3 before 2
744         assertTrue(executor.runAll(MAX_REQUESTS));
745         assertFalse(result.isDone());
746         future3.complete(outcome);
747
748         assertTrue(executor.runAll(MAX_REQUESTS));
749         assertFalse(result.isDone());
750         future2.complete(outcome);
751
752         // all of them are now done
753         assertTrue(executor.runAll(MAX_REQUESTS));
754         assertTrue(result.isDone());
755         assertSame(outcome, result.get());
756     }
757
758     /**
759      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
760      */
761     @Test
762     @SuppressWarnings("unchecked")
763     public void testAllOfEdge() throws Exception {
764         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
765
766         // zero items: check both using a list and using an array
767         assertNull(oper.allOf(tasks));
768         assertNull(oper.allOf());
769
770         // one item: : check both using a list and using an array
771         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
772         tasks.add(() -> future1);
773
774         assertSame(future1, oper.allOf(tasks));
775         assertSame(future1, oper.allOf(() -> future1));
776     }
777
778     @Test
779     public void testAttachFutures() throws Exception {
780         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
781
782         // third task throws an exception during construction
783         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
784         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
785         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
786         tasks.add(() -> future1);
787         tasks.add(() -> future2);
788         tasks.add(() -> {
789             throw new IllegalStateException(EXPECTED_EXCEPTION);
790         });
791         tasks.add(() -> future3);
792
793         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
794
795         // should have canceled the first two, but not the last
796         assertTrue(future1.isCancelled());
797         assertTrue(future2.isCancelled());
798         assertFalse(future3.isCancelled());
799     }
800
801     @Test
802     public void testCombineOutcomes() throws Exception {
803         // only one outcome
804         verifyOutcomes(0, PolicyResult.SUCCESS);
805         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
806
807         // maximum is in different positions
808         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
809         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
810         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
811
812         // null outcome - takes precedence over a success
813         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
814         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
815         tasks.add(() -> CompletableFuture.completedFuture(null));
816         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
817         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
818
819         assertTrue(executor.runAll(MAX_REQUESTS));
820         assertTrue(result.isDone());
821         assertNull(result.get());
822
823         // one throws an exception during execution
824         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
825
826         tasks.clear();
827         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
828         tasks.add(() -> CompletableFuture.failedFuture(except));
829         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
830         result = oper.allOf(tasks);
831
832         assertTrue(executor.runAll(MAX_REQUESTS));
833         assertTrue(result.isCompletedExceptionally());
834         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
835     }
836
837     /**
838      * Tests both flavors of sequence(), because one invokes the other.
839      */
840     @Test
841     public void testSequence() throws Exception {
842         final OperationOutcome outcome = params.makeOutcome();
843
844         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
845         tasks.add(() -> CompletableFuture.completedFuture(outcome));
846         tasks.add(() -> null);
847         tasks.add(() -> CompletableFuture.completedFuture(outcome));
848         tasks.add(() -> CompletableFuture.completedFuture(outcome));
849
850         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
851         assertTrue(executor.runAll(MAX_REQUESTS));
852         assertTrue(result.isDone());
853         assertSame(outcome, result.get());
854
855         // repeat using array form
856         @SuppressWarnings("unchecked")
857         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
858         result = oper.sequence(tasks.toArray(taskArray));
859         assertTrue(executor.runAll(MAX_REQUESTS));
860         assertTrue(result.isDone());
861         assertSame(outcome, result.get());
862
863         // second task fails, third should not run
864         OperationOutcome failure = params.makeOutcome();
865         failure.setResult(PolicyResult.FAILURE);
866         tasks.clear();
867         tasks.add(() -> CompletableFuture.completedFuture(outcome));
868         tasks.add(() -> CompletableFuture.completedFuture(failure));
869         tasks.add(() -> CompletableFuture.completedFuture(outcome));
870
871         result = oper.sequence(tasks);
872         assertTrue(executor.runAll(MAX_REQUESTS));
873         assertTrue(result.isDone());
874         assertSame(failure, result.get());
875     }
876
877     /**
878      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
879      */
880     @Test
881     @SuppressWarnings("unchecked")
882     public void testSequenceEdge() throws Exception {
883         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
884
885         // zero items: check both using a list and using an array
886         assertNull(oper.sequence(tasks));
887         assertNull(oper.sequence());
888
889         // one item: : check both using a list and using an array
890         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
891         tasks.add(() -> future1);
892
893         assertSame(future1, oper.sequence(tasks));
894         assertSame(future1, oper.sequence(() -> future1));
895     }
896
897     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
898         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
899
900         OperationOutcome expectedOutcome = null;
901
902         for (int count = 0; count < results.length; ++count) {
903             OperationOutcome outcome = params.makeOutcome();
904             outcome.setResult(results[count]);
905             tasks.add(() -> CompletableFuture.completedFuture(outcome));
906
907             if (count == expected) {
908                 expectedOutcome = outcome;
909             }
910         }
911
912         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
913
914         assertTrue(executor.runAll(MAX_REQUESTS));
915         assertTrue(result.isDone());
916         assertSame(expectedOutcome, result.get());
917     }
918
919     @Test
920     public void testDetmPriority() throws CoderException {
921         assertEquals(1, oper.detmPriority(null));
922
923         OperationOutcome outcome = params.makeOutcome();
924
925         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
926                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
927                         PolicyResult.FAILURE_EXCEPTION, 6);
928
929         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
930             outcome.setResult(ent.getKey());
931             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
932         }
933
934         /*
935          * Test null result. We can't actually set it to null, because the set() method
936          * won't allow it. Instead, we decode it from a structure.
937          */
938         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
939         assertEquals(1, oper.detmPriority(outcome));
940     }
941
942     /**
943      * Tests callbackStarted() when the pipeline has already been stopped.
944      */
945     @Test
946     public void testCallbackStartedNotRunning() {
947         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
948
949         /*
950          * arrange to stop the controller when the start-callback is invoked, but capture
951          * the outcome
952          */
953         params = params.toBuilder().startCallback(oper -> {
954             starter(oper);
955             future.get().cancel(false);
956         }).build();
957
958         // new params, thus need a new operation
959         oper = new MyOper();
960
961         future.set(oper.start());
962         assertTrue(executor.runAll(MAX_REQUESTS));
963
964         // should have only run once
965         assertEquals(1, numStart);
966     }
967
968     /**
969      * Tests callbackCompleted() when the pipeline has already been stopped.
970      */
971     @Test
972     public void testCallbackCompletedNotRunning() {
973         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
974
975         // arrange to stop the controller when the start-callback is invoked
976         params = params.toBuilder().startCallback(oper -> {
977             future.get().cancel(false);
978         }).build();
979
980         // new params, thus need a new operation
981         oper = new MyOper();
982
983         future.set(oper.start());
984         assertTrue(executor.runAll(MAX_REQUESTS));
985
986         // should not have been set
987         assertNull(opend);
988         assertEquals(0, numEnd);
989     }
990
991     @Test
992     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
993         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
994
995         OperationOutcome outcome;
996
997         outcome = new OperationOutcome();
998         oper.setOutcome(outcome, timex);
999         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1000         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1001
1002         outcome = new OperationOutcome();
1003         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1004         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1005         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1006     }
1007
1008     @Test
1009     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1010         OperationOutcome outcome;
1011
1012         outcome = new OperationOutcome();
1013         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1014         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1015         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1016
1017         for (PolicyResult result : FAILURE_RESULTS) {
1018             outcome = new OperationOutcome();
1019             oper.setOutcome(outcome, result);
1020             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1021             assertEquals(result.toString(), result, outcome.getResult());
1022         }
1023     }
1024
1025     @Test
1026     public void testIsTimeout() {
1027         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1028
1029         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1030         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1031         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1032         assertFalse(oper.isTimeout(new CompletionException(null)));
1033         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1034
1035         assertTrue(oper.isTimeout(timex));
1036         assertTrue(oper.isTimeout(new CompletionException(timex)));
1037     }
1038
1039     @Test
1040     public void testLogMessage() {
1041         final String infraStr = SINK_INFRA.toString();
1042
1043         // log structured data
1044         appender.clearExtractions();
1045         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1046         List<String> output = appender.getExtracted();
1047         assertEquals(1, output.size());
1048
1049         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1050                         .contains("{\n  \"text\": \"my-text\"\n}");
1051
1052         // repeat with a response
1053         appender.clearExtractions();
1054         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1055         output = appender.getExtracted();
1056         assertEquals(1, output.size());
1057
1058         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1059                         .contains("{\n  \"text\": \"my-text\"\n}");
1060
1061         // log a plain string
1062         appender.clearExtractions();
1063         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1064         output = appender.getExtracted();
1065         assertEquals(1, output.size());
1066         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1067
1068         // log a null request
1069         appender.clearExtractions();
1070         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1071         output = appender.getExtracted();
1072         assertEquals(1, output.size());
1073
1074         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1075
1076         // generate exception from coder
1077         setOperCoderException();
1078
1079         appender.clearExtractions();
1080         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1081         output = appender.getExtracted();
1082         assertEquals(2, output.size());
1083         assertThat(output.get(0)).contains("cannot pretty-print request");
1084         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1085
1086         // repeat with a response
1087         appender.clearExtractions();
1088         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1089         output = appender.getExtracted();
1090         assertEquals(2, output.size());
1091         assertThat(output.get(0)).contains("cannot pretty-print response");
1092         assertThat(output.get(1)).contains(MY_SOURCE);
1093     }
1094
1095     @Test
1096     public void testGetRetry() {
1097         assertEquals(0, oper.getRetry(null));
1098         assertEquals(10, oper.getRetry(10));
1099     }
1100
1101     @Test
1102     public void testGetRetryWait() {
1103         // need an operator that doesn't override the retry time
1104         OperationPartial oper2 = new OperationPartial(params, operator) {};
1105         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1106     }
1107
1108     @Test
1109     public void testGetTimeOutMs() {
1110         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1111
1112         params = params.toBuilder().timeoutSec(null).build();
1113
1114         // new params, thus need a new operation
1115         oper = new MyOper();
1116
1117         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1118     }
1119
1120     private void starter(OperationOutcome oper) {
1121         ++numStart;
1122         tstart = oper.getStart();
1123         opstart = oper;
1124     }
1125
1126     private void completer(OperationOutcome oper) {
1127         ++numEnd;
1128         opend = oper;
1129     }
1130
1131     /**
1132      * Gets a function that does nothing.
1133      *
1134      * @param <T> type of input parameter expected by the function
1135      * @return a function that does nothing
1136      */
1137     private <T> Consumer<T> noop() {
1138         return unused -> {
1139         };
1140     }
1141
1142     private OperationOutcome makeSuccess() {
1143         OperationOutcome outcome = params.makeOutcome();
1144         outcome.setResult(PolicyResult.SUCCESS);
1145
1146         return outcome;
1147     }
1148
1149     private OperationOutcome makeFailure() {
1150         OperationOutcome outcome = params.makeOutcome();
1151         outcome.setResult(PolicyResult.FAILURE);
1152
1153         return outcome;
1154     }
1155
1156     /**
1157      * Verifies a run.
1158      *
1159      * @param testName test name
1160      * @param expectedCallbacks number of callbacks expected
1161      * @param expectedOperations number of operation invocations expected
1162      * @param expectedResult expected outcome
1163      */
1164     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1165                     PolicyResult expectedResult) {
1166
1167         String expectedSubRequestId =
1168                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1169
1170         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1171     }
1172
1173     /**
1174      * Verifies a run.
1175      *
1176      * @param testName test name
1177      * @param expectedCallbacks number of callbacks expected
1178      * @param expectedOperations number of operation invocations expected
1179      * @param expectedResult expected outcome
1180      * @param expectedSubRequestId expected sub request ID
1181      * @param manipulator function to modify the future returned by
1182      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1183      *        in the executor are run
1184      */
1185     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1186                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1187
1188         CompletableFuture<OperationOutcome> future = oper.start();
1189
1190         manipulator.accept(future);
1191
1192         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1193
1194         assertEquals(testName, expectedCallbacks, numStart);
1195         assertEquals(testName, expectedCallbacks, numEnd);
1196
1197         if (expectedCallbacks > 0) {
1198             assertNotNull(testName, opstart);
1199             assertNotNull(testName, opend);
1200             assertEquals(testName, expectedResult, opend.getResult());
1201
1202             assertSame(testName, tstart, opstart.getStart());
1203             assertSame(testName, tstart, opend.getStart());
1204
1205             try {
1206                 assertTrue(future.isDone());
1207                 assertSame(testName, opend, future.get());
1208
1209             } catch (InterruptedException | ExecutionException e) {
1210                 throw new IllegalStateException(e);
1211             }
1212
1213             if (expectedOperations > 0) {
1214                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1215             }
1216         }
1217
1218         assertEquals(testName, expectedOperations, oper.getCount());
1219     }
1220
1221     /**
1222      * Creates a new {@link #oper} whose coder will throw an exception.
1223      */
1224     private void setOperCoderException() {
1225         oper = new MyOper() {
1226             @Override
1227             protected Coder makeCoder() {
1228                 return new StandardCoder() {
1229                     @Override
1230                     public String encode(Object object, boolean pretty) throws CoderException {
1231                         throw new CoderException(EXPECTED_EXCEPTION);
1232                     }
1233                 };
1234             }
1235         };
1236     }
1237
1238
1239     @Getter
1240     public static class MyData {
1241         private String text = TEXT;
1242     }
1243
1244
1245     private class MyOper extends OperationPartial {
1246         @Getter
1247         private int count = 0;
1248
1249         @Setter
1250         private boolean genException;
1251
1252         @Setter
1253         private int maxFailures = 0;
1254
1255         @Setter
1256         private CompletableFuture<OperationOutcome> guard;
1257
1258
1259         public MyOper() {
1260             super(OperationPartialTest.this.params, operator);
1261         }
1262
1263         @Override
1264         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1265             ++count;
1266             if (genException) {
1267                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1268             }
1269
1270             operation.setSubRequestId(String.valueOf(attempt));
1271
1272             if (count > maxFailures) {
1273                 operation.setResult(PolicyResult.SUCCESS);
1274             } else {
1275                 operation.setResult(PolicyResult.FAILURE);
1276             }
1277
1278             return operation;
1279         }
1280
1281         @Override
1282         protected CompletableFuture<OperationOutcome> startGuardAsync() {
1283             return (guard != null ? guard : super.startGuardAsync());
1284         }
1285
1286         @Override
1287         protected long getRetryWaitMs() {
1288             /*
1289              * Sleep timers run in the background, but we want to control things via the
1290              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1291              */
1292             return 0L;
1293         }
1294     }
1295 }