d617a72ee8c95bdd9243ea3ce0a36482fdd94039
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31
32 import ch.qos.logback.classic.Logger;
33 import java.time.Instant;
34 import java.util.ArrayDeque;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.Deque;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Map.Entry;
42 import java.util.UUID;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.CompletionException;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import java.util.stream.Collectors;
54 import lombok.Getter;
55 import lombok.Setter;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.junit.runner.RunWith;
61 import org.mockito.Mock;
62 import org.mockito.junit.MockitoJUnitRunner;
63 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
64 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
65 import org.onap.policy.common.utils.coder.Coder;
66 import org.onap.policy.common.utils.coder.CoderException;
67 import org.onap.policy.common.utils.coder.StandardCoder;
68 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
69 import org.onap.policy.common.utils.time.PseudoExecutor;
70 import org.onap.policy.controlloop.ControlLoopOperation;
71 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
72 import org.onap.policy.controlloop.actorserviceprovider.Operation;
73 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
76 import org.onap.policy.controlloop.actorserviceprovider.Operator;
77 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
79 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
80 import org.slf4j.LoggerFactory;
81
82 @RunWith(MockitoJUnitRunner.class)
83 public class OperationPartialTest {
84     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86     private static final int MAX_REQUESTS = 100;
87     private static final int MAX_PARALLEL = 10;
88     private static final String EXPECTED_EXCEPTION = "expected exception";
89     private static final String ACTOR = "my-actor";
90     private static final String OPERATION = "my-operation";
91     private static final String MY_SINK = "my-sink";
92     private static final String MY_SOURCE = "my-source";
93     private static final String MY_TARGET_ENTITY = "my-entity";
94     private static final String TEXT = "my-text";
95     private static final int TIMEOUT = 1000;
96     private static final UUID REQ_ID = UUID.randomUUID();
97
98     private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
99                     .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
100
101     /**
102      * Used to attach an appender to the class' logger.
103      */
104     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105     private static final ExtractAppender appender = new ExtractAppender();
106
107     private static final List<String> PROP_NAMES = List.of("hello", "world");
108
109     @Mock
110     private ActorService service;
111     @Mock
112     private Actor guardActor;
113     @Mock
114     private Operator guardOperator;
115     @Mock
116     private Operation guardOperation;
117
118     private PseudoExecutor executor;
119     private ControlLoopOperationParams params;
120
121     private MyOper oper;
122
123     private int numStart;
124     private int numEnd;
125
126     private Instant tstart;
127
128     private OperationOutcome opstart;
129     private OperationOutcome opend;
130
131     private Deque<OperationOutcome> starts;
132     private Deque<OperationOutcome> ends;
133
134     private OperatorConfig config;
135
136     /**
137      * Attaches the appender to the logger.
138      */
139     @BeforeClass
140     public static void setUpBeforeClass() throws Exception {
141         /*
142          * Attach appender to the logger.
143          */
144         appender.setContext(logger.getLoggerContext());
145         appender.start();
146
147         logger.addAppender(appender);
148     }
149
150     /**
151      * Stops the appender.
152      */
153     @AfterClass
154     public static void tearDownAfterClass() {
155         appender.stop();
156     }
157
158     /**
159      * Initializes the fields, including {@link #oper}.
160      */
161     @Before
162     public void setUp() {
163         executor = new PseudoExecutor();
164
165         params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
166                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
167                         .startCallback(this::starter).build();
168
169         config = new OperatorConfig(executor);
170
171         oper = new MyOper();
172
173         tstart = null;
174
175         opstart = null;
176         opend = null;
177
178         starts = new ArrayDeque<>(10);
179         ends = new ArrayDeque<>(10);
180     }
181
182     @Test
183     public void testOperatorPartial_testGetActorName_testGetName() {
184         assertEquals(ACTOR, oper.getActorName());
185         assertEquals(OPERATION, oper.getName());
186         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
187     }
188
189     @Test
190     public void testGetBlockingThread() throws Exception {
191         CompletableFuture<Void> future = new CompletableFuture<>();
192
193         // use the real executor
194         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
195             @Override
196             public Operation buildOperation(ControlLoopOperationParams params) {
197                 return null;
198             }
199         };
200
201         oper2.getBlockingExecutor().execute(() -> future.complete(null));
202
203         assertNull(future.get(5, TimeUnit.SECONDS));
204     }
205
206     @Test
207     public void testGetPropertyNames() {
208         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
209     }
210
211     @Test
212     public void testGetProperty_testSetProperty_testGetRequiredProperty() {
213         oper.setProperty("propertyA", "valueA");
214         oper.setProperty("propertyB", "valueB");
215         oper.setProperty("propertyC", 20);
216         oper.setProperty("propertyD", "valueD");
217
218         assertEquals("valueA", oper.getProperty("propertyA"));
219         assertEquals("valueB", oper.getProperty("propertyB"));
220         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
221
222         assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
223
224         assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
225                         .withMessage("missing some type");
226     }
227
228     @Test
229     public void testStart() {
230         verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
231     }
232
233     /**
234      * Tests start() with multiple running requests.
235      */
236     @Test
237     public void testStartMultiple() {
238         for (int count = 0; count < MAX_PARALLEL; ++count) {
239             oper.start();
240         }
241
242         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
243
244         assertNotNull(opstart);
245         assertNotNull(opend);
246         assertEquals(OperationResult.SUCCESS, opend.getResult());
247
248         assertEquals(MAX_PARALLEL, numStart);
249         assertEquals(MAX_PARALLEL, oper.getCount());
250         assertEquals(MAX_PARALLEL, numEnd);
251     }
252
253     @Test
254     public void testStartOperationAsync() {
255         oper.start();
256         assertTrue(executor.runAll(MAX_REQUESTS));
257
258         assertEquals(1, oper.getCount());
259     }
260
261     @Test
262     public void testIsSuccess() {
263         assertFalse(oper.isSuccess(null));
264
265         OperationOutcome outcome = new OperationOutcome();
266
267         outcome.setResult(OperationResult.SUCCESS);
268         assertTrue(oper.isSuccess(outcome));
269
270         for (OperationResult failure : FAILURE_RESULTS) {
271             outcome.setResult(failure);
272             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
273         }
274     }
275
276     @Test
277     public void testIsActorFailed() {
278         assertFalse(oper.isActorFailed(null));
279
280         OperationOutcome outcome = params.makeOutcome();
281
282         // incorrect outcome
283         outcome.setResult(OperationResult.SUCCESS);
284         assertFalse(oper.isActorFailed(outcome));
285
286         outcome.setResult(OperationResult.FAILURE_RETRIES);
287         assertFalse(oper.isActorFailed(outcome));
288
289         // correct outcome
290         outcome.setResult(OperationResult.FAILURE);
291
292         // incorrect actor
293         outcome.setActor(MY_SINK);
294         assertFalse(oper.isActorFailed(outcome));
295         outcome.setActor(null);
296         assertFalse(oper.isActorFailed(outcome));
297         outcome.setActor(ACTOR);
298
299         // incorrect operation
300         outcome.setOperation(MY_SINK);
301         assertFalse(oper.isActorFailed(outcome));
302         outcome.setOperation(null);
303         assertFalse(oper.isActorFailed(outcome));
304         outcome.setOperation(OPERATION);
305
306         // correct values
307         assertTrue(oper.isActorFailed(outcome));
308     }
309
310     @Test
311     public void testDoOperation() {
312         /*
313          * Use an operation that doesn't override doOperation().
314          */
315         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
316
317         oper2.start();
318         assertTrue(executor.runAll(MAX_REQUESTS));
319
320         assertNotNull(opend);
321         assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
322     }
323
324     @Test
325     public void testTimeout() throws Exception {
326
327         // use a real executor
328         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
329
330         // trigger timeout very quickly
331         oper = new MyOper() {
332             @Override
333             protected long getTimeoutMs(Integer timeoutSec) {
334                 return 1;
335             }
336
337             @Override
338             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
339
340                 OperationOutcome outcome2 = params.makeOutcome();
341                 outcome2.setResult(OperationResult.SUCCESS);
342
343                 /*
344                  * Create an incomplete future that will timeout after the operation's
345                  * timeout. If it fires before the other timer, then it will return a
346                  * SUCCESS outcome.
347                  */
348                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
349                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
350                                 params.getExecutor());
351
352                 return future;
353             }
354         };
355
356         assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
357     }
358
359     /**
360      * Tests retry functions, when the count is set to zero and retries are exhausted.
361      */
362     @Test
363     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
364         params = params.toBuilder().retry(0).build();
365
366         // new params, thus need a new operation
367         oper = new MyOper();
368
369         oper.setMaxFailures(10);
370
371         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
372     }
373
374     /**
375      * Tests retry functions, when the count is null and retries are exhausted.
376      */
377     @Test
378     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
379         params = params.toBuilder().retry(null).build();
380
381         // new params, thus need a new operation
382         oper = new MyOper();
383
384         oper.setMaxFailures(10);
385
386         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
387     }
388
389     /**
390      * Tests retry functions, when retries are exhausted.
391      */
392     @Test
393     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
394         final int maxRetries = 3;
395         params = params.toBuilder().retry(maxRetries).build();
396
397         // new params, thus need a new operation
398         oper = new MyOper();
399
400         oper.setMaxFailures(10);
401
402         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
403                         OperationResult.FAILURE_RETRIES);
404     }
405
406     /**
407      * Tests retry functions, when a success follows some retries.
408      */
409     @Test
410     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
411         params = params.toBuilder().retry(10).build();
412
413         // new params, thus need a new operation
414         oper = new MyOper();
415
416         final int maxFailures = 3;
417         oper.setMaxFailures(maxFailures);
418
419         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
420                         OperationResult.SUCCESS);
421     }
422
423     /**
424      * Tests retry functions, when the outcome is {@code null}.
425      */
426     @Test
427     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
428
429         // arrange to return null from doOperation()
430         oper = new MyOper() {
431             @Override
432             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
433
434                 // update counters
435                 super.doOperation(attempt, outcome);
436                 return null;
437             }
438         };
439
440         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
441     }
442
443     @Test
444     public void testSleep() throws Exception {
445         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
446         assertTrue(future.isDone());
447         assertNull(future.get());
448
449         // edge case
450         future = oper.sleep(0, TimeUnit.SECONDS);
451         assertTrue(future.isDone());
452         assertNull(future.get());
453
454         /*
455          * Start a second sleep we can use to check the first while it's running.
456          */
457         tstart = Instant.now();
458         future = oper.sleep(100, TimeUnit.MILLISECONDS);
459
460         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
461
462         // wait for second to complete and verify that the first has not completed
463         future2.get();
464         assertFalse(future.isDone());
465
466         // wait for second to complete
467         future.get();
468
469         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
470         assertTrue(diff >= 99);
471     }
472
473     @Test
474     public void testIsSameOperation() {
475         assertFalse(oper.isSameOperation(null));
476
477         OperationOutcome outcome = params.makeOutcome();
478
479         // wrong actor - should be false
480         outcome.setActor(null);
481         assertFalse(oper.isSameOperation(outcome));
482         outcome.setActor(MY_SINK);
483         assertFalse(oper.isSameOperation(outcome));
484         outcome.setActor(ACTOR);
485
486         // wrong operation - should be null
487         outcome.setOperation(null);
488         assertFalse(oper.isSameOperation(outcome));
489         outcome.setOperation(MY_SINK);
490         assertFalse(oper.isSameOperation(outcome));
491         outcome.setOperation(OPERATION);
492
493         assertTrue(oper.isSameOperation(outcome));
494     }
495
496     @Test
497     public void testFromException() {
498         // arrange to generate an exception when operation runs
499         oper.setGenException(true);
500
501         verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
502     }
503
504     /**
505      * Tests fromException() when there is no exception.
506      */
507     @Test
508     public void testFromExceptionNoExcept() {
509         verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
510     }
511
512     /**
513      * Tests both flavors of anyOf(), because one invokes the other.
514      */
515     @Test
516     public void testAnyOf() throws Exception {
517         // first task completes, others do not
518         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
519
520         final OperationOutcome outcome = params.makeOutcome();
521
522         tasks.add(() -> CompletableFuture.completedFuture(outcome));
523         tasks.add(() -> new CompletableFuture<>());
524         tasks.add(() -> null);
525         tasks.add(() -> new CompletableFuture<>());
526
527         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
528         assertTrue(executor.runAll(MAX_REQUESTS));
529         assertTrue(result.isDone());
530         assertSame(outcome, result.get());
531
532         // repeat using array form
533         @SuppressWarnings("unchecked")
534         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
535         result = oper.anyOf(tasks.toArray(taskArray));
536         assertTrue(executor.runAll(MAX_REQUESTS));
537         assertTrue(result.isDone());
538         assertSame(outcome, result.get());
539
540         // second task completes, others do not
541         tasks.clear();
542         tasks.add(() -> new CompletableFuture<>());
543         tasks.add(() -> CompletableFuture.completedFuture(outcome));
544         tasks.add(() -> new CompletableFuture<>());
545
546         result = oper.anyOf(tasks);
547         assertTrue(executor.runAll(MAX_REQUESTS));
548         assertTrue(result.isDone());
549         assertSame(outcome, result.get());
550
551         // third task completes, others do not
552         tasks.clear();
553         tasks.add(() -> new CompletableFuture<>());
554         tasks.add(() -> new CompletableFuture<>());
555         tasks.add(() -> CompletableFuture.completedFuture(outcome));
556
557         result = oper.anyOf(tasks);
558         assertTrue(executor.runAll(MAX_REQUESTS));
559         assertTrue(result.isDone());
560         assertSame(outcome, result.get());
561     }
562
563     /**
564      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
565      */
566     @Test
567     @SuppressWarnings("unchecked")
568     public void testAnyOfEdge() throws Exception {
569         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
570
571         // zero items: check both using a list and using an array
572         assertNull(oper.anyOf(tasks));
573         assertNull(oper.anyOf());
574
575         // one item: : check both using a list and using an array
576         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
577         tasks.add(() -> future1);
578
579         assertSame(future1, oper.anyOf(tasks));
580         assertSame(future1, oper.anyOf(() -> future1));
581     }
582
583     @Test
584     public void testAllOfArray() throws Exception {
585         final OperationOutcome outcome = params.makeOutcome();
586
587         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
588         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
589         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
590
591         @SuppressWarnings("unchecked")
592         CompletableFuture<OperationOutcome> result =
593                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
594
595         assertTrue(executor.runAll(MAX_REQUESTS));
596         assertFalse(result.isDone());
597         future1.complete(outcome);
598
599         // complete 3 before 2
600         assertTrue(executor.runAll(MAX_REQUESTS));
601         assertFalse(result.isDone());
602         future3.complete(outcome);
603
604         assertTrue(executor.runAll(MAX_REQUESTS));
605         assertFalse(result.isDone());
606         future2.complete(outcome);
607
608         // all of them are now done
609         assertTrue(executor.runAll(MAX_REQUESTS));
610         assertTrue(result.isDone());
611         assertSame(outcome, result.get());
612     }
613
614     @Test
615     public void testAllOfList() throws Exception {
616         final OperationOutcome outcome = params.makeOutcome();
617
618         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
619         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
620         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
621
622         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
623         tasks.add(() -> future1);
624         tasks.add(() -> future2);
625         tasks.add(() -> null);
626         tasks.add(() -> future3);
627
628         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
629
630         assertTrue(executor.runAll(MAX_REQUESTS));
631         assertFalse(result.isDone());
632         future1.complete(outcome);
633
634         // complete 3 before 2
635         assertTrue(executor.runAll(MAX_REQUESTS));
636         assertFalse(result.isDone());
637         future3.complete(outcome);
638
639         assertTrue(executor.runAll(MAX_REQUESTS));
640         assertFalse(result.isDone());
641         future2.complete(outcome);
642
643         // all of them are now done
644         assertTrue(executor.runAll(MAX_REQUESTS));
645         assertTrue(result.isDone());
646         assertSame(outcome, result.get());
647     }
648
649     /**
650      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
651      */
652     @Test
653     @SuppressWarnings("unchecked")
654     public void testAllOfEdge() throws Exception {
655         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
656
657         // zero items: check both using a list and using an array
658         assertNull(oper.allOf(tasks));
659         assertNull(oper.allOf());
660
661         // one item: : check both using a list and using an array
662         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
663         tasks.add(() -> future1);
664
665         assertSame(future1, oper.allOf(tasks));
666         assertSame(future1, oper.allOf(() -> future1));
667     }
668
669     @Test
670     public void testAttachFutures() throws Exception {
671         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
672
673         // third task throws an exception during construction
674         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
675         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
676         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
677         tasks.add(() -> future1);
678         tasks.add(() -> future2);
679         tasks.add(() -> {
680             throw new IllegalStateException(EXPECTED_EXCEPTION);
681         });
682         tasks.add(() -> future3);
683
684         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
685
686         // should have canceled the first two, but not the last
687         assertTrue(future1.isCancelled());
688         assertTrue(future2.isCancelled());
689         assertFalse(future3.isCancelled());
690     }
691
692     @Test
693     public void testCombineOutcomes() throws Exception {
694         // only one outcome
695         verifyOutcomes(0, OperationResult.SUCCESS);
696         verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
697
698         // maximum is in different positions
699         verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
700         verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
701         verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
702
703         // null outcome - takes precedence over a success
704         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
705         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
706         tasks.add(() -> CompletableFuture.completedFuture(null));
707         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
708         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
709
710         assertTrue(executor.runAll(MAX_REQUESTS));
711         assertTrue(result.isDone());
712         assertNull(result.get());
713
714         // one throws an exception during execution
715         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
716
717         tasks.clear();
718         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
719         tasks.add(() -> CompletableFuture.failedFuture(except));
720         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
721         result = oper.allOf(tasks);
722
723         assertTrue(executor.runAll(MAX_REQUESTS));
724         assertTrue(result.isCompletedExceptionally());
725         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
726     }
727
728     /**
729      * Tests both flavors of sequence(), because one invokes the other.
730      */
731     @Test
732     public void testSequence() throws Exception {
733         final OperationOutcome outcome = params.makeOutcome();
734
735         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
736         tasks.add(() -> CompletableFuture.completedFuture(outcome));
737         tasks.add(() -> null);
738         tasks.add(() -> CompletableFuture.completedFuture(outcome));
739         tasks.add(() -> CompletableFuture.completedFuture(outcome));
740
741         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
742         assertTrue(executor.runAll(MAX_REQUESTS));
743         assertTrue(result.isDone());
744         assertSame(outcome, result.get());
745
746         // repeat using array form
747         @SuppressWarnings("unchecked")
748         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
749         result = oper.sequence(tasks.toArray(taskArray));
750         assertTrue(executor.runAll(MAX_REQUESTS));
751         assertTrue(result.isDone());
752         assertSame(outcome, result.get());
753
754         // second task fails, third should not run
755         OperationOutcome failure = params.makeOutcome();
756         failure.setResult(OperationResult.FAILURE);
757         tasks.clear();
758         tasks.add(() -> CompletableFuture.completedFuture(outcome));
759         tasks.add(() -> CompletableFuture.completedFuture(failure));
760         tasks.add(() -> CompletableFuture.completedFuture(outcome));
761
762         result = oper.sequence(tasks);
763         assertTrue(executor.runAll(MAX_REQUESTS));
764         assertTrue(result.isDone());
765         assertSame(failure, result.get());
766     }
767
768     /**
769      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
770      */
771     @Test
772     @SuppressWarnings("unchecked")
773     public void testSequenceEdge() throws Exception {
774         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
775
776         // zero items: check both using a list and using an array
777         assertNull(oper.sequence(tasks));
778         assertNull(oper.sequence());
779
780         // one item: : check both using a list and using an array
781         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
782         tasks.add(() -> future1);
783
784         assertSame(future1, oper.sequence(tasks));
785         assertSame(future1, oper.sequence(() -> future1));
786     }
787
788     private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
789         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
790
791         OperationOutcome expectedOutcome = null;
792
793         for (int count = 0; count < results.length; ++count) {
794             OperationOutcome outcome = params.makeOutcome();
795             outcome.setResult(results[count]);
796             tasks.add(() -> CompletableFuture.completedFuture(outcome));
797
798             if (count == expected) {
799                 expectedOutcome = outcome;
800             }
801         }
802
803         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
804
805         assertTrue(executor.runAll(MAX_REQUESTS));
806         assertTrue(result.isDone());
807         assertSame(expectedOutcome, result.get());
808     }
809
810     @Test
811     public void testDetmPriority() throws CoderException {
812         assertEquals(1, oper.detmPriority(null));
813
814         OperationOutcome outcome = params.makeOutcome();
815
816         Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
817                 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
818                 OperationResult.FAILURE_EXCEPTION, 6);
819
820         for (Entry<OperationResult, Integer> ent : map.entrySet()) {
821             outcome.setResult(ent.getKey());
822             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
823         }
824
825         /*
826          * Test null result. We can't actually set it to null, because the set() method
827          * won't allow it. Instead, we decode it from a structure.
828          */
829         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
830         assertEquals(1, oper.detmPriority(outcome));
831     }
832
833     /**
834      * Tests callbackStarted() when the pipeline has already been stopped.
835      */
836     @Test
837     public void testCallbackStartedNotRunning() {
838         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
839
840         /*
841          * arrange to stop the controller when the start-callback is invoked, but capture
842          * the outcome
843          */
844         params = params.toBuilder().startCallback(oper -> {
845             starter(oper);
846             future.get().cancel(false);
847         }).build();
848
849         // new params, thus need a new operation
850         oper = new MyOper();
851
852         future.set(oper.start());
853         assertTrue(executor.runAll(MAX_REQUESTS));
854
855         // should have only run once
856         assertEquals(1, numStart);
857     }
858
859     /**
860      * Tests callbackCompleted() when the pipeline has already been stopped.
861      */
862     @Test
863     public void testCallbackCompletedNotRunning() {
864         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
865
866         // arrange to stop the controller when the start-callback is invoked
867         params = params.toBuilder().startCallback(oper -> {
868             future.get().cancel(false);
869         }).build();
870
871         // new params, thus need a new operation
872         oper = new MyOper();
873
874         future.set(oper.start());
875         assertTrue(executor.runAll(MAX_REQUESTS));
876
877         // should not have been set
878         assertNull(opend);
879         assertEquals(0, numEnd);
880     }
881
882     @Test
883     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
884         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
885
886         OperationOutcome outcome;
887
888         outcome = new OperationOutcome();
889         oper.setOutcome(outcome, timex);
890         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
891         assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
892
893         outcome = new OperationOutcome();
894         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
895         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
896         assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
897     }
898
899     @Test
900     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
901         OperationOutcome outcome;
902
903         outcome = new OperationOutcome();
904         oper.setOutcome(outcome, OperationResult.SUCCESS);
905         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
906         assertEquals(OperationResult.SUCCESS, outcome.getResult());
907
908         oper.setOutcome(outcome, OperationResult.SUCCESS);
909         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
910         assertEquals(OperationResult.SUCCESS, outcome.getResult());
911
912         for (OperationResult result : FAILURE_RESULTS) {
913             outcome = new OperationOutcome();
914             oper.setOutcome(outcome, result);
915             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
916             assertEquals(result.toString(), result, outcome.getResult());
917         }
918     }
919
920     @Test
921     public void testMakeOutcome() {
922         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
923         assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
924     }
925
926     @Test
927     public void testIsTimeout() {
928         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
929
930         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
931         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
932         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
933         assertFalse(oper.isTimeout(new CompletionException(null)));
934         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
935
936         assertTrue(oper.isTimeout(timex));
937         assertTrue(oper.isTimeout(new CompletionException(timex)));
938     }
939
940     @Test
941     public void testLogMessage() {
942         final String infraStr = SINK_INFRA.toString();
943
944         // log structured data
945         appender.clearExtractions();
946         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
947         List<String> output = appender.getExtracted();
948         assertEquals(1, output.size());
949
950         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
951                         .contains("{\n  \"text\": \"my-text\"\n}");
952
953         // repeat with a response
954         appender.clearExtractions();
955         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
956         output = appender.getExtracted();
957         assertEquals(1, output.size());
958
959         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
960                         .contains("{\n  \"text\": \"my-text\"\n}");
961
962         // log a plain string
963         appender.clearExtractions();
964         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
965         output = appender.getExtracted();
966         assertEquals(1, output.size());
967         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
968
969         // log a null request
970         appender.clearExtractions();
971         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
972         output = appender.getExtracted();
973         assertEquals(1, output.size());
974
975         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
976
977         // generate exception from coder
978         setOperCoderException();
979
980         appender.clearExtractions();
981         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
982         output = appender.getExtracted();
983         assertEquals(2, output.size());
984         assertThat(output.get(0)).contains("cannot pretty-print request");
985         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
986
987         // repeat with a response
988         appender.clearExtractions();
989         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
990         output = appender.getExtracted();
991         assertEquals(2, output.size());
992         assertThat(output.get(0)).contains("cannot pretty-print response");
993         assertThat(output.get(1)).contains(MY_SOURCE);
994     }
995
996     @Test
997     public void testGetRetry() {
998         assertEquals(0, oper.getRetry(null));
999         assertEquals(10, oper.getRetry(10));
1000     }
1001
1002     @Test
1003     public void testGetRetryWait() {
1004         // need an operator that doesn't override the retry time
1005         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1006         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1007     }
1008
1009     @Test
1010     public void testGetTimeOutMs() {
1011         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1012
1013         params = params.toBuilder().timeoutSec(null).build();
1014
1015         // new params, thus need a new operation
1016         oper = new MyOper();
1017
1018         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1019     }
1020
1021     private void starter(OperationOutcome oper) {
1022         ++numStart;
1023         tstart = oper.getStart();
1024         opstart = oper;
1025         starts.add(oper);
1026     }
1027
1028     private void completer(OperationOutcome oper) {
1029         ++numEnd;
1030         opend = oper;
1031         ends.add(oper);
1032     }
1033
1034     /**
1035      * Gets a function that does nothing.
1036      *
1037      * @param <T> type of input parameter expected by the function
1038      * @return a function that does nothing
1039      */
1040     private <T> Consumer<T> noop() {
1041         return unused -> {
1042         };
1043     }
1044
1045     /**
1046      * Verifies a run.
1047      *
1048      * @param testName test name
1049      * @param expectedCallbacks number of callbacks expected
1050      * @param expectedOperations number of operation invocations expected
1051      * @param expectedResult expected outcome
1052      */
1053     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1054             OperationResult expectedResult) {
1055
1056         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1057     }
1058
1059     /**
1060      * Verifies a run.
1061      *
1062      * @param testName test name
1063      * @param expectedCallbacks number of callbacks expected
1064      * @param expectedOperations number of operation invocations expected
1065      * @param expectedResult expected outcome
1066      * @param manipulator function to modify the future returned by
1067      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1068      *        in the executor are run
1069      */
1070     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1071             OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1072
1073         tstart = null;
1074         opstart = null;
1075         opend = null;
1076         starts.clear();
1077         ends.clear();
1078
1079         CompletableFuture<OperationOutcome> future = oper.start();
1080
1081         manipulator.accept(future);
1082
1083         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1084
1085         assertEquals(testName, expectedCallbacks, numStart);
1086         assertEquals(testName, expectedCallbacks, numEnd);
1087
1088         if (expectedCallbacks > 0) {
1089             assertNotNull(testName, opstart);
1090             assertNotNull(testName, opend);
1091             assertEquals(testName, expectedResult, opend.getResult());
1092
1093             assertSame(testName, tstart, opstart.getStart());
1094             assertSame(testName, tstart, opend.getStart());
1095
1096             try {
1097                 assertTrue(future.isDone());
1098                 assertEquals(testName, opend, future.get());
1099
1100                 // "start" is never final
1101                 for (OperationOutcome outcome : starts) {
1102                     assertFalse(testName, outcome.isFinalOutcome());
1103                 }
1104
1105                 // only the last "complete" is final
1106                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1107
1108                 for (OperationOutcome outcome : ends) {
1109                     assertFalse(outcome.isFinalOutcome());
1110                 }
1111
1112             } catch (InterruptedException | ExecutionException e) {
1113                 throw new IllegalStateException(e);
1114             }
1115
1116             if (expectedOperations > 0) {
1117                 assertNotNull(testName, oper.getSubRequestId());
1118                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1119                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1120             }
1121         }
1122
1123         assertEquals(testName, expectedOperations, oper.getCount());
1124     }
1125
1126     /**
1127      * Creates a new {@link #oper} whose coder will throw an exception.
1128      */
1129     private void setOperCoderException() {
1130         oper = new MyOper() {
1131             @Override
1132             protected Coder getCoder() {
1133                 return new StandardCoder() {
1134                     @Override
1135                     public String encode(Object object, boolean pretty) throws CoderException {
1136                         throw new CoderException(EXPECTED_EXCEPTION);
1137                     }
1138                 };
1139             }
1140         };
1141     }
1142
1143
1144     @Getter
1145     public static class MyData {
1146         private String text = TEXT;
1147     }
1148
1149
1150     private class MyOper extends OperationPartial {
1151         @Getter
1152         private int count = 0;
1153
1154         @Setter
1155         private boolean genException;
1156         @Setter
1157         private int maxFailures = 0;
1158         @Setter
1159         private CompletableFuture<OperationOutcome> preProc;
1160
1161
1162         public MyOper() {
1163             super(OperationPartialTest.this.params, config, PROP_NAMES);
1164         }
1165
1166         @Override
1167         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1168             ++count;
1169             if (genException) {
1170                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1171             }
1172
1173             operation.setSubRequestId(String.valueOf(attempt));
1174
1175             if (count > maxFailures) {
1176                 operation.setResult(OperationResult.SUCCESS);
1177             } else {
1178                 operation.setResult(OperationResult.FAILURE);
1179             }
1180
1181             return operation;
1182         }
1183
1184         @Override
1185         protected long getRetryWaitMs() {
1186             /*
1187              * Sleep timers run in the background, but we want to control things via the
1188              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1189              */
1190             return 0L;
1191         }
1192     }
1193 }