57dbfb1410c0b265a15d04dfa33105f407c8f204
[policy/models.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.actorserviceprovider.impl;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertNull;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32
33 import ch.qos.logback.classic.Logger;
34 import java.time.Instant;
35 import java.util.ArrayDeque;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.Deque;
39 import java.util.LinkedList;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Map.Entry;
43 import java.util.UUID;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionException;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ForkJoinPool;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Supplier;
54 import lombok.Getter;
55 import lombok.Setter;
56 import org.junit.jupiter.api.AfterAll;
57 import org.junit.jupiter.api.BeforeAll;
58 import org.junit.jupiter.api.BeforeEach;
59 import org.junit.jupiter.api.Test;
60 import org.junit.jupiter.api.extension.ExtendWith;
61 import org.junit.runner.RunWith;
62 import org.mockito.Mock;
63 import org.mockito.junit.MockitoJUnitRunner;
64 import org.mockito.junit.jupiter.MockitoExtension;
65 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
66 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
67 import org.onap.policy.common.utils.coder.Coder;
68 import org.onap.policy.common.utils.coder.CoderException;
69 import org.onap.policy.common.utils.coder.StandardCoder;
70 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
71 import org.onap.policy.common.utils.time.PseudoExecutor;
72 import org.onap.policy.controlloop.ControlLoopOperation;
73 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
74 import org.onap.policy.controlloop.actorserviceprovider.Operation;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
81 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
82 import org.slf4j.LoggerFactory;
83
84 @ExtendWith(MockitoExtension.class)
85 class OperationPartialTest {
86     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
87     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
88     private static final int MAX_REQUESTS = 100;
89     private static final int MAX_PARALLEL = 10;
90     private static final String EXPECTED_EXCEPTION = "expected exception";
91     private static final String ACTOR = "my-actor";
92     private static final String OPERATION = "my-operation";
93     private static final String MY_SINK = "my-sink";
94     private static final String MY_SOURCE = "my-source";
95     private static final String MY_TARGET_ENTITY = "my-entity";
96     private static final String TEXT = "my-text";
97     private static final int TIMEOUT = 1000;
98     private static final UUID REQ_ID = UUID.randomUUID();
99
100     private static final List<OperationResult> FAILURE_RESULTS = Arrays.stream(OperationResult.values())
101                     .filter(result -> result != OperationResult.SUCCESS).toList();
102
103     /**
104      * Used to attach an appender to the class' logger.
105      */
106     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
107     private static final ExtractAppender appender = new ExtractAppender();
108
109     private static final List<String> PROP_NAMES = List.of("hello", "world");
110
111     @Mock
112     private ActorService service;
113     @Mock
114     private Actor guardActor;
115     @Mock
116     private Operator guardOperator;
117     @Mock
118     private Operation guardOperation;
119
120     private PseudoExecutor executor;
121     private ControlLoopOperationParams params;
122
123     private MyOper oper;
124
125     private int numStart;
126     private int numEnd;
127
128     private Instant tstart;
129
130     private OperationOutcome opstart;
131     private OperationOutcome opend;
132
133     private Deque<OperationOutcome> starts;
134     private Deque<OperationOutcome> ends;
135
136     private OperatorConfig config;
137
138     /**
139      * Attaches the appender to the logger.
140      */
141     @BeforeAll
142     static void setUpBeforeClass() throws Exception {
143         /*
144          * Attach appender to the logger.
145          */
146         appender.setContext(logger.getLoggerContext());
147         appender.start();
148
149         logger.addAppender(appender);
150     }
151
152     /**
153      * Stops the appender.
154      */
155     @AfterAll
156     static void tearDownAfterClass() {
157         appender.stop();
158     }
159
160     /**
161      * Initializes the fields, including {@link #oper}.
162      */
163     @BeforeEach
164     void setUp() {
165         executor = new PseudoExecutor();
166
167         params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
168                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
169                         .startCallback(this::starter).build();
170
171         config = new OperatorConfig(executor);
172
173         oper = new MyOper();
174
175         tstart = null;
176
177         opstart = null;
178         opend = null;
179
180         starts = new ArrayDeque<>(10);
181         ends = new ArrayDeque<>(10);
182     }
183
184     @Test
185     void testOperatorPartial_testGetActorName_testGetName() {
186         assertEquals(ACTOR, oper.getActorName());
187         assertEquals(OPERATION, oper.getName());
188         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
189     }
190
191     @Test
192      void testGetBlockingThread() throws Exception {
193         CompletableFuture<Void> future = new CompletableFuture<>();
194
195         // use the real executor
196         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
197             @Override
198             public Operation buildOperation(ControlLoopOperationParams params) {
199                 return null;
200             }
201         };
202
203         oper2.getBlockingExecutor().execute(() -> future.complete(null));
204
205         assertNull(future.get(5, TimeUnit.SECONDS));
206     }
207
208     @Test
209      void testGetPropertyNames() {
210         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
211     }
212
213     @Test
214      void testGetProperty_testSetProperty_testGetRequiredProperty() {
215         oper.setProperty("propertyA", "valueA");
216         oper.setProperty("propertyB", "valueB");
217         oper.setProperty("propertyC", 20);
218         oper.setProperty("propertyD", "valueD");
219
220         assertEquals("valueA", oper.getProperty("propertyA"));
221         assertEquals("valueB", oper.getProperty("propertyB"));
222         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
223
224         assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
225
226         assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
227                         .withMessage("missing some type");
228     }
229
230     @Test
231      void testStart() {
232         verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
233     }
234
235     /**
236      * Tests start() with multiple running requests.
237      */
238     @Test
239      void testStartMultiple() {
240         for (int count = 0; count < MAX_PARALLEL; ++count) {
241             oper.start();
242         }
243
244         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
245
246         assertNotNull(opstart);
247         assertNotNull(opend);
248         assertEquals(OperationResult.SUCCESS, opend.getResult());
249
250         assertEquals(MAX_PARALLEL, numStart);
251         assertEquals(MAX_PARALLEL, oper.getCount());
252         assertEquals(MAX_PARALLEL, numEnd);
253     }
254
255     @Test
256      void testStartOperationAsync() {
257         oper.start();
258         assertTrue(executor.runAll(MAX_REQUESTS));
259
260         assertEquals(1, oper.getCount());
261     }
262
263     @Test
264      void testIsSuccess() {
265         assertFalse(oper.isSuccess(null));
266
267         OperationOutcome outcome = new OperationOutcome();
268
269         outcome.setResult(OperationResult.SUCCESS);
270         assertTrue(oper.isSuccess(outcome));
271
272         for (OperationResult failure : FAILURE_RESULTS) {
273             outcome.setResult(failure);
274             assertFalse(oper.isSuccess(outcome), "testIsSuccess-" + failure);
275         }
276     }
277
278     @Test
279      void testIsActorFailed() {
280         assertFalse(oper.isActorFailed(null));
281
282         OperationOutcome outcome = params.makeOutcome();
283
284         // incorrect outcome
285         outcome.setResult(OperationResult.SUCCESS);
286         assertFalse(oper.isActorFailed(outcome));
287
288         outcome.setResult(OperationResult.FAILURE_RETRIES);
289         assertFalse(oper.isActorFailed(outcome));
290
291         // correct outcome
292         outcome.setResult(OperationResult.FAILURE);
293
294         // incorrect actor
295         outcome.setActor(MY_SINK);
296         assertFalse(oper.isActorFailed(outcome));
297         outcome.setActor(null);
298         assertFalse(oper.isActorFailed(outcome));
299         outcome.setActor(ACTOR);
300
301         // incorrect operation
302         outcome.setOperation(MY_SINK);
303         assertFalse(oper.isActorFailed(outcome));
304         outcome.setOperation(null);
305         assertFalse(oper.isActorFailed(outcome));
306         outcome.setOperation(OPERATION);
307
308         // correct values
309         assertTrue(oper.isActorFailed(outcome));
310     }
311
312     @Test
313      void testDoOperation() {
314         /*
315          * Use an operation that doesn't override doOperation().
316          */
317         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
318
319         oper2.start();
320         assertTrue(executor.runAll(MAX_REQUESTS));
321
322         assertNotNull(opend);
323         assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
324     }
325
326     @Test
327      void testTimeout() throws Exception {
328
329         // use a real executor
330         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
331
332         // trigger timeout very quickly
333         oper = new MyOper() {
334             @Override
335             protected long getTimeoutMs(Integer timeoutSec) {
336                 return 1;
337             }
338
339             @Override
340             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
341
342                 OperationOutcome outcome2 = params.makeOutcome();
343                 outcome2.setResult(OperationResult.SUCCESS);
344
345                 /*
346                  * Create an incomplete future that will timeout after the operation's
347                  * timeout. If it fires before the other timer, then it will return a
348                  * SUCCESS outcome.
349                  */
350                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
351                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
352                                 params.getExecutor());
353
354                 return future;
355             }
356         };
357
358         assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
359     }
360
361     /**
362      * Tests retry functions, when the count is set to zero and retries are exhausted.
363      */
364     @Test
365      void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
366         params = params.toBuilder().retry(0).build();
367
368         // new params, thus need a new operation
369         oper = new MyOper();
370
371         oper.setMaxFailures(10);
372
373         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
374     }
375
376     /**
377      * Tests retry functions, when the count is null and retries are exhausted.
378      */
379     @Test
380      void testSetRetryFlag_testRetryOnFailure_NullRetries() {
381         params = params.toBuilder().retry(null).build();
382
383         // new params, thus need a new operation
384         oper = new MyOper();
385
386         oper.setMaxFailures(10);
387
388         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
389     }
390
391     /**
392      * Tests retry functions, when retries are exhausted.
393      */
394     @Test
395      void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
396         final int maxRetries = 3;
397         params = params.toBuilder().retry(maxRetries).build();
398
399         // new params, thus need a new operation
400         oper = new MyOper();
401
402         oper.setMaxFailures(10);
403
404         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
405                         OperationResult.FAILURE_RETRIES);
406     }
407
408     /**
409      * Tests retry functions, when a success follows some retries.
410      */
411     @Test
412      void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
413         params = params.toBuilder().retry(10).build();
414
415         // new params, thus need a new operation
416         oper = new MyOper();
417
418         final int maxFailures = 3;
419         oper.setMaxFailures(maxFailures);
420
421         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
422                         OperationResult.SUCCESS);
423     }
424
425     /**
426      * Tests retry functions, when the outcome is {@code null}.
427      */
428     @Test
429      void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
430
431         // arrange to return null from doOperation()
432         oper = new MyOper() {
433             @Override
434             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
435
436                 // update counters
437                 super.doOperation(attempt, outcome);
438                 return null;
439             }
440         };
441
442         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
443     }
444
445     @Test
446      void testSleep() throws Exception {
447         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
448         assertTrue(future.isDone());
449         assertNull(future.get());
450
451         // edge case
452         future = oper.sleep(0, TimeUnit.SECONDS);
453         assertTrue(future.isDone());
454         assertNull(future.get());
455
456         /*
457          * Start a second sleep we can use to check the first while it's running.
458          */
459         tstart = Instant.now();
460         future = oper.sleep(100, TimeUnit.MILLISECONDS);
461
462         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
463
464         // wait for second to complete and verify that the first has not completed
465         future2.get();
466         assertFalse(future.isDone());
467
468         // wait for second to complete
469         future.get();
470
471         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
472         assertTrue(diff >= 99);
473     }
474
475     @Test
476      void testIsSameOperation() {
477         assertFalse(oper.isSameOperation(null));
478
479         OperationOutcome outcome = params.makeOutcome();
480
481         // wrong actor - should be false
482         outcome.setActor(null);
483         assertFalse(oper.isSameOperation(outcome));
484         outcome.setActor(MY_SINK);
485         assertFalse(oper.isSameOperation(outcome));
486         outcome.setActor(ACTOR);
487
488         // wrong operation - should be null
489         outcome.setOperation(null);
490         assertFalse(oper.isSameOperation(outcome));
491         outcome.setOperation(MY_SINK);
492         assertFalse(oper.isSameOperation(outcome));
493         outcome.setOperation(OPERATION);
494
495         assertTrue(oper.isSameOperation(outcome));
496     }
497
498     @Test
499      void testFromException() {
500         // arrange to generate an exception when operation runs
501         oper.setGenException(true);
502
503         verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
504     }
505
506     /**
507      * Tests fromException() when there is no exception.
508      */
509     @Test
510      void testFromExceptionNoExcept() {
511         verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
512     }
513
514     /**
515      * Tests both flavors of anyOf(), because one invokes the other.
516      */
517     @Test
518      void testAnyOf() throws Exception {
519         // first task completes, others do not
520         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
521
522         final OperationOutcome outcome = params.makeOutcome();
523
524         tasks.add(() -> CompletableFuture.completedFuture(outcome));
525         tasks.add(() -> new CompletableFuture<>());
526         tasks.add(() -> null);
527         tasks.add(() -> new CompletableFuture<>());
528
529         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
530         assertTrue(executor.runAll(MAX_REQUESTS));
531         assertTrue(result.isDone());
532         assertSame(outcome, result.get());
533
534         // repeat using array form
535         @SuppressWarnings("unchecked")
536         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
537         result = oper.anyOf(tasks.toArray(taskArray));
538         assertTrue(executor.runAll(MAX_REQUESTS));
539         assertTrue(result.isDone());
540         assertSame(outcome, result.get());
541
542         // second task completes, others do not
543         tasks.clear();
544         tasks.add(() -> new CompletableFuture<>());
545         tasks.add(() -> CompletableFuture.completedFuture(outcome));
546         tasks.add(() -> new CompletableFuture<>());
547
548         result = oper.anyOf(tasks);
549         assertTrue(executor.runAll(MAX_REQUESTS));
550         assertTrue(result.isDone());
551         assertSame(outcome, result.get());
552
553         // third task completes, others do not
554         tasks.clear();
555         tasks.add(() -> new CompletableFuture<>());
556         tasks.add(() -> new CompletableFuture<>());
557         tasks.add(() -> CompletableFuture.completedFuture(outcome));
558
559         result = oper.anyOf(tasks);
560         assertTrue(executor.runAll(MAX_REQUESTS));
561         assertTrue(result.isDone());
562         assertSame(outcome, result.get());
563     }
564
565     /**
566      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
567      */
568     @Test
569     @SuppressWarnings("unchecked")
570      void testAnyOfEdge() throws Exception {
571         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
572
573         // zero items: check both using a list and using an array
574         assertNull(oper.anyOf(tasks));
575         assertNull(oper.anyOf());
576
577         // one item: : check both using a list and using an array
578         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
579         tasks.add(() -> future1);
580
581         assertSame(future1, oper.anyOf(tasks));
582         assertSame(future1, oper.anyOf(() -> future1));
583     }
584
585     @Test
586      void testAllOfArray() throws Exception {
587         final OperationOutcome outcome = params.makeOutcome();
588
589         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
590         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
591         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
592
593         @SuppressWarnings("unchecked")
594         CompletableFuture<OperationOutcome> result =
595                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
596
597         assertTrue(executor.runAll(MAX_REQUESTS));
598         assertFalse(result.isDone());
599         future1.complete(outcome);
600
601         // complete 3 before 2
602         assertTrue(executor.runAll(MAX_REQUESTS));
603         assertFalse(result.isDone());
604         future3.complete(outcome);
605
606         assertTrue(executor.runAll(MAX_REQUESTS));
607         assertFalse(result.isDone());
608         future2.complete(outcome);
609
610         // all of them are now done
611         assertTrue(executor.runAll(MAX_REQUESTS));
612         assertTrue(result.isDone());
613         assertSame(outcome, result.get());
614     }
615
616     @Test
617      void testAllOfList() throws Exception {
618         final OperationOutcome outcome = params.makeOutcome();
619
620         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
621         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
622         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
623
624         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
625         tasks.add(() -> future1);
626         tasks.add(() -> future2);
627         tasks.add(() -> null);
628         tasks.add(() -> future3);
629
630         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
631
632         assertTrue(executor.runAll(MAX_REQUESTS));
633         assertFalse(result.isDone());
634         future1.complete(outcome);
635
636         // complete 3 before 2
637         assertTrue(executor.runAll(MAX_REQUESTS));
638         assertFalse(result.isDone());
639         future3.complete(outcome);
640
641         assertTrue(executor.runAll(MAX_REQUESTS));
642         assertFalse(result.isDone());
643         future2.complete(outcome);
644
645         // all of them are now done
646         assertTrue(executor.runAll(MAX_REQUESTS));
647         assertTrue(result.isDone());
648         assertSame(outcome, result.get());
649     }
650
651     /**
652      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
653      */
654     @Test
655     @SuppressWarnings("unchecked")
656      void testAllOfEdge() throws Exception {
657         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
658
659         // zero items: check both using a list and using an array
660         assertNull(oper.allOf(tasks));
661         assertNull(oper.allOf());
662
663         // one item: : check both using a list and using an array
664         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
665         tasks.add(() -> future1);
666
667         assertSame(future1, oper.allOf(tasks));
668         assertSame(future1, oper.allOf(() -> future1));
669     }
670
671     @Test
672      void testAttachFutures() throws Exception {
673         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
674
675         // third task throws an exception during construction
676         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
677         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
678         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
679         tasks.add(() -> future1);
680         tasks.add(() -> future2);
681         tasks.add(() -> {
682             throw new IllegalStateException(EXPECTED_EXCEPTION);
683         });
684         tasks.add(() -> future3);
685
686         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
687
688         // should have canceled the first two, but not the last
689         assertTrue(future1.isCancelled());
690         assertTrue(future2.isCancelled());
691         assertFalse(future3.isCancelled());
692     }
693
694     @Test
695      void testCombineOutcomes() throws Exception {
696         // only one outcome
697         verifyOutcomes(0, OperationResult.SUCCESS);
698         verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
699
700         // maximum is in different positions
701         verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
702         verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
703         verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
704
705         // null outcome - takes precedence over a success
706         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
707         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
708         tasks.add(() -> CompletableFuture.completedFuture(null));
709         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
710         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
711
712         assertTrue(executor.runAll(MAX_REQUESTS));
713         assertTrue(result.isDone());
714         assertNull(result.get());
715
716         // one throws an exception during execution
717         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
718
719         tasks.clear();
720         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
721         tasks.add(() -> CompletableFuture.failedFuture(except));
722         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
723         result = oper.allOf(tasks);
724
725         assertTrue(executor.runAll(MAX_REQUESTS));
726         assertTrue(result.isCompletedExceptionally());
727         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
728     }
729
730     /**
731      * Tests both flavors of sequence(), because one invokes the other.
732      */
733     @Test
734      void testSequence() throws Exception {
735         final OperationOutcome outcome = params.makeOutcome();
736
737         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
738         tasks.add(() -> CompletableFuture.completedFuture(outcome));
739         tasks.add(() -> null);
740         tasks.add(() -> CompletableFuture.completedFuture(outcome));
741         tasks.add(() -> CompletableFuture.completedFuture(outcome));
742
743         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
744         assertTrue(executor.runAll(MAX_REQUESTS));
745         assertTrue(result.isDone());
746         assertSame(outcome, result.get());
747
748         // repeat using array form
749         @SuppressWarnings("unchecked")
750         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
751         result = oper.sequence(tasks.toArray(taskArray));
752         assertTrue(executor.runAll(MAX_REQUESTS));
753         assertTrue(result.isDone());
754         assertSame(outcome, result.get());
755
756         // second task fails, third should not run
757         OperationOutcome failure = params.makeOutcome();
758         failure.setResult(OperationResult.FAILURE);
759         tasks.clear();
760         tasks.add(() -> CompletableFuture.completedFuture(outcome));
761         tasks.add(() -> CompletableFuture.completedFuture(failure));
762         tasks.add(() -> CompletableFuture.completedFuture(outcome));
763
764         result = oper.sequence(tasks);
765         assertTrue(executor.runAll(MAX_REQUESTS));
766         assertTrue(result.isDone());
767         assertSame(failure, result.get());
768     }
769
770     /**
771      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
772      */
773     @Test
774     @SuppressWarnings("unchecked")
775      void testSequenceEdge() throws Exception {
776         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
777
778         // zero items: check both using a list and using an array
779         assertNull(oper.sequence(tasks));
780         assertNull(oper.sequence());
781
782         // one item: : check both using a list and using an array
783         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
784         tasks.add(() -> future1);
785
786         assertSame(future1, oper.sequence(tasks));
787         assertSame(future1, oper.sequence(() -> future1));
788     }
789
790     private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
791         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
792
793         OperationOutcome expectedOutcome = null;
794
795         for (int count = 0; count < results.length; ++count) {
796             OperationOutcome outcome = params.makeOutcome();
797             outcome.setResult(results[count]);
798             tasks.add(() -> CompletableFuture.completedFuture(outcome));
799
800             if (count == expected) {
801                 expectedOutcome = outcome;
802             }
803         }
804
805         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
806
807         assertTrue(executor.runAll(MAX_REQUESTS));
808         assertTrue(result.isDone());
809         assertSame(expectedOutcome, result.get());
810     }
811
812     @Test
813      void testDetmPriority() throws CoderException {
814         assertEquals(1, oper.detmPriority(null));
815
816         OperationOutcome outcome = params.makeOutcome();
817
818         Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
819                 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
820                 OperationResult.FAILURE_EXCEPTION, 6);
821
822         for (Entry<OperationResult, Integer> ent : map.entrySet()) {
823             outcome.setResult(ent.getKey());
824             assertEquals(ent.getValue().intValue(), oper.detmPriority(outcome), ent.getKey().toString());
825         }
826
827         /*
828          * Test null result. We can't actually set it to null, because the set() method
829          * won't allow it. Instead, we decode it from a structure.
830          */
831         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
832         assertEquals(1, oper.detmPriority(outcome));
833     }
834
835     /**
836      * Tests callbackStarted() when the pipeline has already been stopped.
837      */
838     @Test
839      void testCallbackStartedNotRunning() {
840         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
841
842         /*
843          * arrange to stop the controller when the start-callback is invoked, but capture
844          * the outcome
845          */
846         params = params.toBuilder().startCallback(oper -> {
847             starter(oper);
848             future.get().cancel(false);
849         }).build();
850
851         // new params, thus need a new operation
852         oper = new MyOper();
853
854         future.set(oper.start());
855         assertTrue(executor.runAll(MAX_REQUESTS));
856
857         // should have only run once
858         assertEquals(1, numStart);
859     }
860
861     /**
862      * Tests callbackCompleted() when the pipeline has already been stopped.
863      */
864     @Test
865      void testCallbackCompletedNotRunning() {
866         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
867
868         // arrange to stop the controller when the start-callback is invoked
869         params = params.toBuilder().startCallback(oper -> {
870             future.get().cancel(false);
871         }).build();
872
873         // new params, thus need a new operation
874         oper = new MyOper();
875
876         future.set(oper.start());
877         assertTrue(executor.runAll(MAX_REQUESTS));
878
879         // should not have been set
880         assertNull(opend);
881         assertEquals(0, numEnd);
882     }
883
884     @Test
885      void testSetOutcomeControlLoopOperationOutcomeThrowable() {
886         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
887
888         OperationOutcome outcome;
889
890         outcome = new OperationOutcome();
891         oper.setOutcome(outcome, timex);
892         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
893         assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
894
895         outcome = new OperationOutcome();
896         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
897         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
898         assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
899     }
900
901     @Test
902      void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
903         OperationOutcome outcome;
904
905         outcome = new OperationOutcome();
906         oper.setOutcome(outcome, OperationResult.SUCCESS);
907         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
908         assertEquals(OperationResult.SUCCESS, outcome.getResult());
909
910         oper.setOutcome(outcome, OperationResult.SUCCESS);
911         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
912         assertEquals(OperationResult.SUCCESS, outcome.getResult());
913
914         for (OperationResult result : FAILURE_RESULTS) {
915             outcome = new OperationOutcome();
916             oper.setOutcome(outcome, result);
917             assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage(), result.toString());
918             assertEquals(result, outcome.getResult(), result.toString());
919         }
920     }
921
922     @Test
923      void testMakeOutcome() {
924         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
925         assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
926     }
927
928     @Test
929      void testIsTimeout() {
930         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
931
932         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
933         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
934         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
935         assertFalse(oper.isTimeout(new CompletionException(null)));
936         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
937
938         assertTrue(oper.isTimeout(timex));
939         assertTrue(oper.isTimeout(new CompletionException(timex)));
940     }
941
942     @Test
943      void testLogMessage() {
944         final String infraStr = SINK_INFRA.toString();
945
946         // log structured data
947         appender.clearExtractions();
948         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
949         List<String> output = appender.getExtracted();
950         assertEquals(1, output.size());
951
952         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
953                         .contains("{\n  \"text\": \"my-text\"\n}");
954
955         // repeat with a response
956         appender.clearExtractions();
957         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
958         output = appender.getExtracted();
959         assertEquals(1, output.size());
960
961         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
962                         .contains("{\n  \"text\": \"my-text\"\n}");
963
964         // log a plain string
965         appender.clearExtractions();
966         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
967         output = appender.getExtracted();
968         assertEquals(1, output.size());
969         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
970
971         // log a null request
972         appender.clearExtractions();
973         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
974         output = appender.getExtracted();
975         assertEquals(1, output.size());
976
977         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
978
979         // generate exception from coder
980         setOperCoderException();
981
982         appender.clearExtractions();
983         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
984         output = appender.getExtracted();
985         assertEquals(2, output.size());
986         assertThat(output.get(0)).contains("cannot pretty-print request");
987         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
988
989         // repeat with a response
990         appender.clearExtractions();
991         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
992         output = appender.getExtracted();
993         assertEquals(2, output.size());
994         assertThat(output.get(0)).contains("cannot pretty-print response");
995         assertThat(output.get(1)).contains(MY_SOURCE);
996     }
997
998     @Test
999      void testGetRetry() {
1000         assertEquals(0, oper.getRetry(null));
1001         assertEquals(10, oper.getRetry(10));
1002     }
1003
1004     @Test
1005      void testGetRetryWait() {
1006         // need an operator that doesn't override the retry time
1007         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1008         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1009     }
1010
1011     @Test
1012      void testGetTimeOutMs() {
1013         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1014
1015         params = params.toBuilder().timeoutSec(null).build();
1016
1017         // new params, thus need a new operation
1018         oper = new MyOper();
1019
1020         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1021     }
1022
1023     private void starter(OperationOutcome oper) {
1024         ++numStart;
1025         tstart = oper.getStart();
1026         opstart = oper;
1027         starts.add(oper);
1028     }
1029
1030     private void completer(OperationOutcome oper) {
1031         ++numEnd;
1032         opend = oper;
1033         ends.add(oper);
1034     }
1035
1036     /**
1037      * Gets a function that does nothing.
1038      *
1039      * @param <T> type of input parameter expected by the function
1040      * @return a function that does nothing
1041      */
1042     private <T> Consumer<T> noop() {
1043         return unused -> {
1044         };
1045     }
1046
1047     /**
1048      * Verifies a run.
1049      *
1050      * @param testName test name
1051      * @param expectedCallbacks number of callbacks expected
1052      * @param expectedOperations number of operation invocations expected
1053      * @param expectedResult expected outcome
1054      */
1055     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1056             OperationResult expectedResult) {
1057
1058         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1059     }
1060
1061     /**
1062      * Verifies a run.
1063      *
1064      * @param testName test name
1065      * @param expectedCallbacks number of callbacks expected
1066      * @param expectedOperations number of operation invocations expected
1067      * @param expectedResult expected outcome
1068      * @param manipulator function to modify the future returned by
1069      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1070      *        in the executor are run
1071      */
1072     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1073             OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1074
1075         tstart = null;
1076         opstart = null;
1077         opend = null;
1078         starts.clear();
1079         ends.clear();
1080
1081         CompletableFuture<OperationOutcome> future = oper.start();
1082
1083         manipulator.accept(future);
1084
1085         assertTrue(executor.runAll(MAX_REQUESTS), testName);
1086
1087         assertEquals(expectedCallbacks, numStart, testName);
1088         assertEquals(expectedCallbacks, numEnd, testName);
1089
1090         if (expectedCallbacks > 0) {
1091             assertNotNull(opstart, testName);
1092             assertNotNull(opend, testName);
1093             assertEquals(expectedResult, opend.getResult(), testName);
1094
1095             assertSame(tstart, opstart.getStart(), testName);
1096             assertSame(tstart, opend.getStart(), testName);
1097
1098             try {
1099                 assertTrue(future.isDone());
1100                 assertEquals(opend, future.get(), testName);
1101
1102                 // "start" is never final
1103                 for (OperationOutcome outcome : starts) {
1104                     assertFalse(outcome.isFinalOutcome(), testName);
1105                 }
1106
1107                 // only the last "complete" is final
1108                 assertTrue(ends.removeLast().isFinalOutcome(), testName);
1109
1110                 for (OperationOutcome outcome : ends) {
1111                     assertFalse(outcome.isFinalOutcome());
1112                 }
1113
1114             } catch (InterruptedException | ExecutionException e) {
1115                 throw new IllegalStateException(e);
1116             }
1117
1118             if (expectedOperations > 0) {
1119                 assertNotNull(testName, oper.getSubRequestId());
1120                 assertEquals(oper.getSubRequestId(), opstart.getSubRequestId(), testName + " op start");
1121                 assertEquals(oper.getSubRequestId(), opend.getSubRequestId(), testName + " op end");
1122             }
1123         }
1124
1125         assertEquals(expectedOperations, oper.getCount(), testName);
1126     }
1127
1128     /**
1129      * Creates a new {@link #oper} whose coder will throw an exception.
1130      */
1131     private void setOperCoderException() {
1132         oper = new MyOper() {
1133             @Override
1134             protected Coder getCoder() {
1135                 return new StandardCoder() {
1136                     @Override
1137                     public String encode(Object object, boolean pretty) throws CoderException {
1138                         throw new CoderException(EXPECTED_EXCEPTION);
1139                     }
1140                 };
1141             }
1142         };
1143     }
1144
1145
1146     @Getter
1147     static class MyData {
1148         private String text = TEXT;
1149     }
1150
1151
1152     private class MyOper extends OperationPartial {
1153         @Getter
1154         private int count = 0;
1155
1156         @Setter
1157         private boolean genException;
1158         @Setter
1159         private int maxFailures = 0;
1160         @Setter
1161         private CompletableFuture<OperationOutcome> preProc;
1162
1163
1164         MyOper() {
1165             super(OperationPartialTest.this.params, config, PROP_NAMES);
1166         }
1167
1168         @Override
1169         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1170             ++count;
1171             if (genException) {
1172                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1173             }
1174
1175             operation.setSubRequestId(String.valueOf(attempt));
1176
1177             if (count > maxFailures) {
1178                 operation.setResult(OperationResult.SUCCESS);
1179             } else {
1180                 operation.setResult(OperationResult.FAILURE);
1181             }
1182
1183             return operation;
1184         }
1185
1186         @Override
1187         protected long getRetryWaitMs() {
1188             /*
1189              * Sleep timers run in the background, but we want to control things via the
1190              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1191              */
1192             return 0L;
1193         }
1194     }
1195 }