Make Actors event-agnostic
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.when;
33
34 import ch.qos.logback.classic.Logger;
35 import java.time.Instant;
36 import java.util.ArrayDeque;
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
56 import lombok.Getter;
57 import lombok.Setter;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.Mock;
63 import org.mockito.MockitoAnnotations;
64 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
65 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
66 import org.onap.policy.common.utils.coder.Coder;
67 import org.onap.policy.common.utils.coder.CoderException;
68 import org.onap.policy.common.utils.coder.StandardCoder;
69 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
70 import org.onap.policy.common.utils.time.PseudoExecutor;
71 import org.onap.policy.controlloop.ControlLoopOperation;
72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
80 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
81 import org.slf4j.LoggerFactory;
82
83 public class OperationPartialTest {
84     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86     private static final int MAX_REQUESTS = 100;
87     private static final int MAX_PARALLEL = 10;
88     private static final String EXPECTED_EXCEPTION = "expected exception";
89     private static final String ACTOR = "my-actor";
90     private static final String OPERATION = "my-operation";
91     private static final String MY_SINK = "my-sink";
92     private static final String MY_SOURCE = "my-source";
93     private static final String MY_TARGET_ENTITY = "my-entity";
94     private static final String TEXT = "my-text";
95     private static final int TIMEOUT = 1000;
96     private static final UUID REQ_ID = UUID.randomUUID();
97
98     private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
99                     .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
100
101     /**
102      * Used to attach an appender to the class' logger.
103      */
104     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105     private static final ExtractAppender appender = new ExtractAppender();
106
107     private static final List<String> PROP_NAMES = List.of("hello", "world");
108
109     @Mock
110     private ActorService service;
111     @Mock
112     private Actor guardActor;
113     @Mock
114     private Operator guardOperator;
115     @Mock
116     private Operation guardOperation;
117
118     private PseudoExecutor executor;
119     private ControlLoopOperationParams params;
120
121     private MyOper oper;
122
123     private int numStart;
124     private int numEnd;
125
126     private Instant tstart;
127
128     private OperationOutcome opstart;
129     private OperationOutcome opend;
130
131     private Deque<OperationOutcome> starts;
132     private Deque<OperationOutcome> ends;
133
134     private OperatorConfig config;
135
136     /**
137      * Attaches the appender to the logger.
138      */
139     @BeforeClass
140     public static void setUpBeforeClass() throws Exception {
141         /*
142          * Attach appender to the logger.
143          */
144         appender.setContext(logger.getLoggerContext());
145         appender.start();
146
147         logger.addAppender(appender);
148     }
149
150     /**
151      * Stops the appender.
152      */
153     @AfterClass
154     public static void tearDownAfterClass() {
155         appender.stop();
156     }
157
158     /**
159      * Initializes the fields, including {@link #oper}.
160      */
161     @Before
162     public void setUp() {
163         MockitoAnnotations.initMocks(this);
164         executor = new PseudoExecutor();
165
166         params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
167                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
168                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
169
170         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
171         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
172         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
173         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
174
175         config = new OperatorConfig(executor);
176
177         oper = new MyOper();
178
179         tstart = null;
180
181         opstart = null;
182         opend = null;
183
184         starts = new ArrayDeque<>(10);
185         ends = new ArrayDeque<>(10);
186     }
187
188     @Test
189     public void testOperatorPartial_testGetActorName_testGetName() {
190         assertEquals(ACTOR, oper.getActorName());
191         assertEquals(OPERATION, oper.getName());
192         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
193     }
194
195     @Test
196     public void testGetBlockingThread() throws Exception {
197         CompletableFuture<Void> future = new CompletableFuture<>();
198
199         // use the real executor
200         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
201             @Override
202             public Operation buildOperation(ControlLoopOperationParams params) {
203                 return null;
204             }
205         };
206
207         oper2.getBlockingExecutor().execute(() -> future.complete(null));
208
209         assertNull(future.get(5, TimeUnit.SECONDS));
210     }
211
212     @Test
213     public void testGetPropertyNames() {
214         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
215     }
216
217     @Test
218     public void testGetProperty_testSetProperty_testGetRequiredProperty() {
219         oper.setProperty("propertyA", "valueA");
220         oper.setProperty("propertyB", "valueB");
221         oper.setProperty("propertyC", 20);
222         oper.setProperty("propertyD", "valueD");
223
224         assertEquals("valueA", oper.getProperty("propertyA"));
225         assertEquals("valueB", oper.getProperty("propertyB"));
226         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
227
228         assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
229
230         assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
231                         .withMessage("missing some type");
232     }
233
234     @Test
235     public void testStart() {
236         verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
237     }
238
239     /**
240      * Tests start() with multiple running requests.
241      */
242     @Test
243     public void testStartMultiple() {
244         for (int count = 0; count < MAX_PARALLEL; ++count) {
245             oper.start();
246         }
247
248         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
249
250         assertNotNull(opstart);
251         assertNotNull(opend);
252         assertEquals(OperationResult.SUCCESS, opend.getResult());
253
254         assertEquals(MAX_PARALLEL, numStart);
255         assertEquals(MAX_PARALLEL, oper.getCount());
256         assertEquals(MAX_PARALLEL, numEnd);
257     }
258
259     @Test
260     public void testStartOperationAsync() {
261         oper.start();
262         assertTrue(executor.runAll(MAX_REQUESTS));
263
264         assertEquals(1, oper.getCount());
265     }
266
267     @Test
268     public void testIsSuccess() {
269         assertFalse(oper.isSuccess(null));
270
271         OperationOutcome outcome = new OperationOutcome();
272
273         outcome.setResult(OperationResult.SUCCESS);
274         assertTrue(oper.isSuccess(outcome));
275
276         for (OperationResult failure : FAILURE_RESULTS) {
277             outcome.setResult(failure);
278             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
279         }
280     }
281
282     @Test
283     public void testIsActorFailed() {
284         assertFalse(oper.isActorFailed(null));
285
286         OperationOutcome outcome = params.makeOutcome(null);
287
288         // incorrect outcome
289         outcome.setResult(OperationResult.SUCCESS);
290         assertFalse(oper.isActorFailed(outcome));
291
292         outcome.setResult(OperationResult.FAILURE_RETRIES);
293         assertFalse(oper.isActorFailed(outcome));
294
295         // correct outcome
296         outcome.setResult(OperationResult.FAILURE);
297
298         // incorrect actor
299         outcome.setActor(MY_SINK);
300         assertFalse(oper.isActorFailed(outcome));
301         outcome.setActor(null);
302         assertFalse(oper.isActorFailed(outcome));
303         outcome.setActor(ACTOR);
304
305         // incorrect operation
306         outcome.setOperation(MY_SINK);
307         assertFalse(oper.isActorFailed(outcome));
308         outcome.setOperation(null);
309         assertFalse(oper.isActorFailed(outcome));
310         outcome.setOperation(OPERATION);
311
312         // correct values
313         assertTrue(oper.isActorFailed(outcome));
314     }
315
316     @Test
317     public void testDoOperation() {
318         /*
319          * Use an operation that doesn't override doOperation().
320          */
321         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
322
323         oper2.start();
324         assertTrue(executor.runAll(MAX_REQUESTS));
325
326         assertNotNull(opend);
327         assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
328     }
329
330     @Test
331     public void testTimeout() throws Exception {
332
333         // use a real executor
334         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
335
336         // trigger timeout very quickly
337         oper = new MyOper() {
338             @Override
339             protected long getTimeoutMs(Integer timeoutSec) {
340                 return 1;
341             }
342
343             @Override
344             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
345
346                 OperationOutcome outcome2 = params.makeOutcome(null);
347                 outcome2.setResult(OperationResult.SUCCESS);
348
349                 /*
350                  * Create an incomplete future that will timeout after the operation's
351                  * timeout. If it fires before the other timer, then it will return a
352                  * SUCCESS outcome.
353                  */
354                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
355                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
356                                 params.getExecutor());
357
358                 return future;
359             }
360         };
361
362         assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
363     }
364
365     /**
366      * Tests retry functions, when the count is set to zero and retries are exhausted.
367      */
368     @Test
369     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
370         params = params.toBuilder().retry(0).build();
371
372         // new params, thus need a new operation
373         oper = new MyOper();
374
375         oper.setMaxFailures(10);
376
377         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
378     }
379
380     /**
381      * Tests retry functions, when the count is null and retries are exhausted.
382      */
383     @Test
384     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
385         params = params.toBuilder().retry(null).build();
386
387         // new params, thus need a new operation
388         oper = new MyOper();
389
390         oper.setMaxFailures(10);
391
392         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
393     }
394
395     /**
396      * Tests retry functions, when retries are exhausted.
397      */
398     @Test
399     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
400         final int maxRetries = 3;
401         params = params.toBuilder().retry(maxRetries).build();
402
403         // new params, thus need a new operation
404         oper = new MyOper();
405
406         oper.setMaxFailures(10);
407
408         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
409                         OperationResult.FAILURE_RETRIES);
410     }
411
412     /**
413      * Tests retry functions, when a success follows some retries.
414      */
415     @Test
416     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
417         params = params.toBuilder().retry(10).build();
418
419         // new params, thus need a new operation
420         oper = new MyOper();
421
422         final int maxFailures = 3;
423         oper.setMaxFailures(maxFailures);
424
425         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
426                         OperationResult.SUCCESS);
427     }
428
429     /**
430      * Tests retry functions, when the outcome is {@code null}.
431      */
432     @Test
433     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
434
435         // arrange to return null from doOperation()
436         oper = new MyOper() {
437             @Override
438             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
439
440                 // update counters
441                 super.doOperation(attempt, outcome);
442                 return null;
443             }
444         };
445
446         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
447     }
448
449     @Test
450     public void testSleep() throws Exception {
451         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
452         assertTrue(future.isDone());
453         assertNull(future.get());
454
455         // edge case
456         future = oper.sleep(0, TimeUnit.SECONDS);
457         assertTrue(future.isDone());
458         assertNull(future.get());
459
460         /*
461          * Start a second sleep we can use to check the first while it's running.
462          */
463         tstart = Instant.now();
464         future = oper.sleep(100, TimeUnit.MILLISECONDS);
465
466         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
467
468         // wait for second to complete and verify that the first has not completed
469         future2.get();
470         assertFalse(future.isDone());
471
472         // wait for second to complete
473         future.get();
474
475         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
476         assertTrue(diff >= 99);
477     }
478
479     @Test
480     public void testIsSameOperation() {
481         assertFalse(oper.isSameOperation(null));
482
483         OperationOutcome outcome = params.makeOutcome(null);
484
485         // wrong actor - should be false
486         outcome.setActor(null);
487         assertFalse(oper.isSameOperation(outcome));
488         outcome.setActor(MY_SINK);
489         assertFalse(oper.isSameOperation(outcome));
490         outcome.setActor(ACTOR);
491
492         // wrong operation - should be null
493         outcome.setOperation(null);
494         assertFalse(oper.isSameOperation(outcome));
495         outcome.setOperation(MY_SINK);
496         assertFalse(oper.isSameOperation(outcome));
497         outcome.setOperation(OPERATION);
498
499         assertTrue(oper.isSameOperation(outcome));
500     }
501
502     @Test
503     public void testFromException() {
504         // arrange to generate an exception when operation runs
505         oper.setGenException(true);
506
507         verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
508     }
509
510     /**
511      * Tests fromException() when there is no exception.
512      */
513     @Test
514     public void testFromExceptionNoExcept() {
515         verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
516     }
517
518     /**
519      * Tests both flavors of anyOf(), because one invokes the other.
520      */
521     @Test
522     public void testAnyOf() throws Exception {
523         // first task completes, others do not
524         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
525
526         final OperationOutcome outcome = params.makeOutcome(null);
527
528         tasks.add(() -> CompletableFuture.completedFuture(outcome));
529         tasks.add(() -> new CompletableFuture<>());
530         tasks.add(() -> null);
531         tasks.add(() -> new CompletableFuture<>());
532
533         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
534         assertTrue(executor.runAll(MAX_REQUESTS));
535         assertTrue(result.isDone());
536         assertSame(outcome, result.get());
537
538         // repeat using array form
539         @SuppressWarnings("unchecked")
540         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
541         result = oper.anyOf(tasks.toArray(taskArray));
542         assertTrue(executor.runAll(MAX_REQUESTS));
543         assertTrue(result.isDone());
544         assertSame(outcome, result.get());
545
546         // second task completes, others do not
547         tasks.clear();
548         tasks.add(() -> new CompletableFuture<>());
549         tasks.add(() -> CompletableFuture.completedFuture(outcome));
550         tasks.add(() -> new CompletableFuture<>());
551
552         result = oper.anyOf(tasks);
553         assertTrue(executor.runAll(MAX_REQUESTS));
554         assertTrue(result.isDone());
555         assertSame(outcome, result.get());
556
557         // third task completes, others do not
558         tasks.clear();
559         tasks.add(() -> new CompletableFuture<>());
560         tasks.add(() -> new CompletableFuture<>());
561         tasks.add(() -> CompletableFuture.completedFuture(outcome));
562
563         result = oper.anyOf(tasks);
564         assertTrue(executor.runAll(MAX_REQUESTS));
565         assertTrue(result.isDone());
566         assertSame(outcome, result.get());
567     }
568
569     /**
570      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
571      */
572     @Test
573     @SuppressWarnings("unchecked")
574     public void testAnyOfEdge() throws Exception {
575         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
576
577         // zero items: check both using a list and using an array
578         assertNull(oper.anyOf(tasks));
579         assertNull(oper.anyOf());
580
581         // one item: : check both using a list and using an array
582         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
583         tasks.add(() -> future1);
584
585         assertSame(future1, oper.anyOf(tasks));
586         assertSame(future1, oper.anyOf(() -> future1));
587     }
588
589     @Test
590     public void testAllOfArray() throws Exception {
591         final OperationOutcome outcome = params.makeOutcome(null);
592
593         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
594         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
595         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
596
597         @SuppressWarnings("unchecked")
598         CompletableFuture<OperationOutcome> result =
599                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
600
601         assertTrue(executor.runAll(MAX_REQUESTS));
602         assertFalse(result.isDone());
603         future1.complete(outcome);
604
605         // complete 3 before 2
606         assertTrue(executor.runAll(MAX_REQUESTS));
607         assertFalse(result.isDone());
608         future3.complete(outcome);
609
610         assertTrue(executor.runAll(MAX_REQUESTS));
611         assertFalse(result.isDone());
612         future2.complete(outcome);
613
614         // all of them are now done
615         assertTrue(executor.runAll(MAX_REQUESTS));
616         assertTrue(result.isDone());
617         assertSame(outcome, result.get());
618     }
619
620     @Test
621     public void testAllOfList() throws Exception {
622         final OperationOutcome outcome = params.makeOutcome(null);
623
624         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
625         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
626         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
627
628         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
629         tasks.add(() -> future1);
630         tasks.add(() -> future2);
631         tasks.add(() -> null);
632         tasks.add(() -> future3);
633
634         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
635
636         assertTrue(executor.runAll(MAX_REQUESTS));
637         assertFalse(result.isDone());
638         future1.complete(outcome);
639
640         // complete 3 before 2
641         assertTrue(executor.runAll(MAX_REQUESTS));
642         assertFalse(result.isDone());
643         future3.complete(outcome);
644
645         assertTrue(executor.runAll(MAX_REQUESTS));
646         assertFalse(result.isDone());
647         future2.complete(outcome);
648
649         // all of them are now done
650         assertTrue(executor.runAll(MAX_REQUESTS));
651         assertTrue(result.isDone());
652         assertSame(outcome, result.get());
653     }
654
655     /**
656      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
657      */
658     @Test
659     @SuppressWarnings("unchecked")
660     public void testAllOfEdge() throws Exception {
661         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
662
663         // zero items: check both using a list and using an array
664         assertNull(oper.allOf(tasks));
665         assertNull(oper.allOf());
666
667         // one item: : check both using a list and using an array
668         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
669         tasks.add(() -> future1);
670
671         assertSame(future1, oper.allOf(tasks));
672         assertSame(future1, oper.allOf(() -> future1));
673     }
674
675     @Test
676     public void testAttachFutures() throws Exception {
677         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
678
679         // third task throws an exception during construction
680         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
681         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
682         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
683         tasks.add(() -> future1);
684         tasks.add(() -> future2);
685         tasks.add(() -> {
686             throw new IllegalStateException(EXPECTED_EXCEPTION);
687         });
688         tasks.add(() -> future3);
689
690         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
691
692         // should have canceled the first two, but not the last
693         assertTrue(future1.isCancelled());
694         assertTrue(future2.isCancelled());
695         assertFalse(future3.isCancelled());
696     }
697
698     @Test
699     public void testCombineOutcomes() throws Exception {
700         // only one outcome
701         verifyOutcomes(0, OperationResult.SUCCESS);
702         verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
703
704         // maximum is in different positions
705         verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
706         verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
707         verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
708
709         // null outcome - takes precedence over a success
710         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
711         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
712         tasks.add(() -> CompletableFuture.completedFuture(null));
713         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
714         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
715
716         assertTrue(executor.runAll(MAX_REQUESTS));
717         assertTrue(result.isDone());
718         assertNull(result.get());
719
720         // one throws an exception during execution
721         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
722
723         tasks.clear();
724         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
725         tasks.add(() -> CompletableFuture.failedFuture(except));
726         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
727         result = oper.allOf(tasks);
728
729         assertTrue(executor.runAll(MAX_REQUESTS));
730         assertTrue(result.isCompletedExceptionally());
731         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
732     }
733
734     /**
735      * Tests both flavors of sequence(), because one invokes the other.
736      */
737     @Test
738     public void testSequence() throws Exception {
739         final OperationOutcome outcome = params.makeOutcome(null);
740
741         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
742         tasks.add(() -> CompletableFuture.completedFuture(outcome));
743         tasks.add(() -> null);
744         tasks.add(() -> CompletableFuture.completedFuture(outcome));
745         tasks.add(() -> CompletableFuture.completedFuture(outcome));
746
747         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
748         assertTrue(executor.runAll(MAX_REQUESTS));
749         assertTrue(result.isDone());
750         assertSame(outcome, result.get());
751
752         // repeat using array form
753         @SuppressWarnings("unchecked")
754         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
755         result = oper.sequence(tasks.toArray(taskArray));
756         assertTrue(executor.runAll(MAX_REQUESTS));
757         assertTrue(result.isDone());
758         assertSame(outcome, result.get());
759
760         // second task fails, third should not run
761         OperationOutcome failure = params.makeOutcome(null);
762         failure.setResult(OperationResult.FAILURE);
763         tasks.clear();
764         tasks.add(() -> CompletableFuture.completedFuture(outcome));
765         tasks.add(() -> CompletableFuture.completedFuture(failure));
766         tasks.add(() -> CompletableFuture.completedFuture(outcome));
767
768         result = oper.sequence(tasks);
769         assertTrue(executor.runAll(MAX_REQUESTS));
770         assertTrue(result.isDone());
771         assertSame(failure, result.get());
772     }
773
774     /**
775      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
776      */
777     @Test
778     @SuppressWarnings("unchecked")
779     public void testSequenceEdge() throws Exception {
780         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
781
782         // zero items: check both using a list and using an array
783         assertNull(oper.sequence(tasks));
784         assertNull(oper.sequence());
785
786         // one item: : check both using a list and using an array
787         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
788         tasks.add(() -> future1);
789
790         assertSame(future1, oper.sequence(tasks));
791         assertSame(future1, oper.sequence(() -> future1));
792     }
793
794     private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
795         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
796
797         OperationOutcome expectedOutcome = null;
798
799         for (int count = 0; count < results.length; ++count) {
800             OperationOutcome outcome = params.makeOutcome(null);
801             outcome.setResult(results[count]);
802             tasks.add(() -> CompletableFuture.completedFuture(outcome));
803
804             if (count == expected) {
805                 expectedOutcome = outcome;
806             }
807         }
808
809         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
810
811         assertTrue(executor.runAll(MAX_REQUESTS));
812         assertTrue(result.isDone());
813         assertSame(expectedOutcome, result.get());
814     }
815
816     @Test
817     public void testDetmPriority() throws CoderException {
818         assertEquals(1, oper.detmPriority(null));
819
820         OperationOutcome outcome = params.makeOutcome(null);
821
822         Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
823                 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
824                 OperationResult.FAILURE_EXCEPTION, 6);
825
826         for (Entry<OperationResult, Integer> ent : map.entrySet()) {
827             outcome.setResult(ent.getKey());
828             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
829         }
830
831         /*
832          * Test null result. We can't actually set it to null, because the set() method
833          * won't allow it. Instead, we decode it from a structure.
834          */
835         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
836         assertEquals(1, oper.detmPriority(outcome));
837     }
838
839     /**
840      * Tests callbackStarted() when the pipeline has already been stopped.
841      */
842     @Test
843     public void testCallbackStartedNotRunning() {
844         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
845
846         /*
847          * arrange to stop the controller when the start-callback is invoked, but capture
848          * the outcome
849          */
850         params = params.toBuilder().startCallback(oper -> {
851             starter(oper);
852             future.get().cancel(false);
853         }).build();
854
855         // new params, thus need a new operation
856         oper = new MyOper();
857
858         future.set(oper.start());
859         assertTrue(executor.runAll(MAX_REQUESTS));
860
861         // should have only run once
862         assertEquals(1, numStart);
863     }
864
865     /**
866      * Tests callbackCompleted() when the pipeline has already been stopped.
867      */
868     @Test
869     public void testCallbackCompletedNotRunning() {
870         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
871
872         // arrange to stop the controller when the start-callback is invoked
873         params = params.toBuilder().startCallback(oper -> {
874             future.get().cancel(false);
875         }).build();
876
877         // new params, thus need a new operation
878         oper = new MyOper();
879
880         future.set(oper.start());
881         assertTrue(executor.runAll(MAX_REQUESTS));
882
883         // should not have been set
884         assertNull(opend);
885         assertEquals(0, numEnd);
886     }
887
888     @Test
889     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
890         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
891
892         OperationOutcome outcome;
893
894         outcome = new OperationOutcome();
895         oper.setOutcome(outcome, timex);
896         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
897         assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
898
899         outcome = new OperationOutcome();
900         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
901         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
902         assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
903     }
904
905     @Test
906     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
907         OperationOutcome outcome;
908
909         outcome = new OperationOutcome();
910         oper.setOutcome(outcome, OperationResult.SUCCESS);
911         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
912         assertEquals(OperationResult.SUCCESS, outcome.getResult());
913
914         oper.setOutcome(outcome, OperationResult.SUCCESS);
915         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
916         assertEquals(OperationResult.SUCCESS, outcome.getResult());
917
918         for (OperationResult result : FAILURE_RESULTS) {
919             outcome = new OperationOutcome();
920             oper.setOutcome(outcome, result);
921             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
922             assertEquals(result.toString(), result, outcome.getResult());
923         }
924     }
925
926     @Test
927     public void testIsTimeout() {
928         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
929
930         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
931         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
932         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
933         assertFalse(oper.isTimeout(new CompletionException(null)));
934         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
935
936         assertTrue(oper.isTimeout(timex));
937         assertTrue(oper.isTimeout(new CompletionException(timex)));
938     }
939
940     @Test
941     public void testLogMessage() {
942         final String infraStr = SINK_INFRA.toString();
943
944         // log structured data
945         appender.clearExtractions();
946         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
947         List<String> output = appender.getExtracted();
948         assertEquals(1, output.size());
949
950         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
951                         .contains("{\n  \"text\": \"my-text\"\n}");
952
953         // repeat with a response
954         appender.clearExtractions();
955         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
956         output = appender.getExtracted();
957         assertEquals(1, output.size());
958
959         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
960                         .contains("{\n  \"text\": \"my-text\"\n}");
961
962         // log a plain string
963         appender.clearExtractions();
964         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
965         output = appender.getExtracted();
966         assertEquals(1, output.size());
967         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
968
969         // log a null request
970         appender.clearExtractions();
971         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
972         output = appender.getExtracted();
973         assertEquals(1, output.size());
974
975         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
976
977         // generate exception from coder
978         setOperCoderException();
979
980         appender.clearExtractions();
981         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
982         output = appender.getExtracted();
983         assertEquals(2, output.size());
984         assertThat(output.get(0)).contains("cannot pretty-print request");
985         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
986
987         // repeat with a response
988         appender.clearExtractions();
989         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
990         output = appender.getExtracted();
991         assertEquals(2, output.size());
992         assertThat(output.get(0)).contains("cannot pretty-print response");
993         assertThat(output.get(1)).contains(MY_SOURCE);
994     }
995
996     @Test
997     public void testGetRetry() {
998         assertEquals(0, oper.getRetry(null));
999         assertEquals(10, oper.getRetry(10));
1000     }
1001
1002     @Test
1003     public void testGetRetryWait() {
1004         // need an operator that doesn't override the retry time
1005         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1006         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1007     }
1008
1009     @Test
1010     public void testGetTargetEntity() {
1011         // get it from the params
1012         assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
1013
1014         // now get it from the properties
1015         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
1016         assertEquals("entityX", oper.getTargetEntity());
1017     }
1018
1019     @Test
1020     public void testGetTimeOutMs() {
1021         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1022
1023         params = params.toBuilder().timeoutSec(null).build();
1024
1025         // new params, thus need a new operation
1026         oper = new MyOper();
1027
1028         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1029     }
1030
1031     private void starter(OperationOutcome oper) {
1032         ++numStart;
1033         tstart = oper.getStart();
1034         opstart = oper;
1035         starts.add(oper);
1036     }
1037
1038     private void completer(OperationOutcome oper) {
1039         ++numEnd;
1040         opend = oper;
1041         ends.add(oper);
1042     }
1043
1044     /**
1045      * Gets a function that does nothing.
1046      *
1047      * @param <T> type of input parameter expected by the function
1048      * @return a function that does nothing
1049      */
1050     private <T> Consumer<T> noop() {
1051         return unused -> {
1052         };
1053     }
1054
1055     private OperationOutcome makeSuccess() {
1056         OperationOutcome outcome = params.makeOutcome(null);
1057         outcome.setResult(OperationResult.SUCCESS);
1058
1059         return outcome;
1060     }
1061
1062     /**
1063      * Verifies a run.
1064      *
1065      * @param testName test name
1066      * @param expectedCallbacks number of callbacks expected
1067      * @param expectedOperations number of operation invocations expected
1068      * @param expectedResult expected outcome
1069      */
1070     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1071             OperationResult expectedResult) {
1072
1073         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1074     }
1075
1076     /**
1077      * Verifies a run.
1078      *
1079      * @param testName test name
1080      * @param expectedCallbacks number of callbacks expected
1081      * @param expectedOperations number of operation invocations expected
1082      * @param expectedResult expected outcome
1083      * @param manipulator function to modify the future returned by
1084      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1085      *        in the executor are run
1086      */
1087     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1088             OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1089
1090         tstart = null;
1091         opstart = null;
1092         opend = null;
1093         starts.clear();
1094         ends.clear();
1095
1096         CompletableFuture<OperationOutcome> future = oper.start();
1097
1098         manipulator.accept(future);
1099
1100         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1101
1102         assertEquals(testName, expectedCallbacks, numStart);
1103         assertEquals(testName, expectedCallbacks, numEnd);
1104
1105         if (expectedCallbacks > 0) {
1106             assertNotNull(testName, opstart);
1107             assertNotNull(testName, opend);
1108             assertEquals(testName, expectedResult, opend.getResult());
1109
1110             assertSame(testName, tstart, opstart.getStart());
1111             assertSame(testName, tstart, opend.getStart());
1112
1113             try {
1114                 assertTrue(future.isDone());
1115                 assertEquals(testName, opend, future.get());
1116
1117                 // "start" is never final
1118                 for (OperationOutcome outcome : starts) {
1119                     assertFalse(testName, outcome.isFinalOutcome());
1120                 }
1121
1122                 // only the last "complete" is final
1123                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1124
1125                 for (OperationOutcome outcome : ends) {
1126                     assertFalse(outcome.isFinalOutcome());
1127                 }
1128
1129             } catch (InterruptedException | ExecutionException e) {
1130                 throw new IllegalStateException(e);
1131             }
1132
1133             if (expectedOperations > 0) {
1134                 assertNotNull(testName, oper.getSubRequestId());
1135                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1136                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1137             }
1138         }
1139
1140         assertEquals(testName, expectedOperations, oper.getCount());
1141     }
1142
1143     /**
1144      * Creates a new {@link #oper} whose coder will throw an exception.
1145      */
1146     private void setOperCoderException() {
1147         oper = new MyOper() {
1148             @Override
1149             protected Coder getCoder() {
1150                 return new StandardCoder() {
1151                     @Override
1152                     public String encode(Object object, boolean pretty) throws CoderException {
1153                         throw new CoderException(EXPECTED_EXCEPTION);
1154                     }
1155                 };
1156             }
1157         };
1158     }
1159
1160
1161     @Getter
1162     public static class MyData {
1163         private String text = TEXT;
1164     }
1165
1166
1167     private class MyOper extends OperationPartial {
1168         @Getter
1169         private int count = 0;
1170
1171         @Setter
1172         private boolean genException;
1173         @Setter
1174         private int maxFailures = 0;
1175         @Setter
1176         private CompletableFuture<OperationOutcome> preProc;
1177
1178
1179         public MyOper() {
1180             super(OperationPartialTest.this.params, config, PROP_NAMES);
1181         }
1182
1183         @Override
1184         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1185             ++count;
1186             if (genException) {
1187                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1188             }
1189
1190             operation.setSubRequestId(String.valueOf(attempt));
1191
1192             if (count > maxFailures) {
1193                 operation.setResult(OperationResult.SUCCESS);
1194             } else {
1195                 operation.setResult(OperationResult.FAILURE);
1196             }
1197
1198             return operation;
1199         }
1200
1201         @Override
1202         protected long getRetryWaitMs() {
1203             /*
1204              * Sleep timers run in the background, but we want to control things via the
1205              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1206              */
1207             return 0L;
1208         }
1209     }
1210 }