Remove targetEntity from makeOutcome
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.when;
33
34 import ch.qos.logback.classic.Logger;
35 import java.time.Instant;
36 import java.util.ArrayDeque;
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
56 import lombok.Getter;
57 import lombok.Setter;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.Mock;
63 import org.mockito.MockitoAnnotations;
64 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
65 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
66 import org.onap.policy.common.utils.coder.Coder;
67 import org.onap.policy.common.utils.coder.CoderException;
68 import org.onap.policy.common.utils.coder.StandardCoder;
69 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
70 import org.onap.policy.common.utils.time.PseudoExecutor;
71 import org.onap.policy.controlloop.ControlLoopOperation;
72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
80 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
81 import org.slf4j.LoggerFactory;
82
83 public class OperationPartialTest {
84     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86     private static final int MAX_REQUESTS = 100;
87     private static final int MAX_PARALLEL = 10;
88     private static final String EXPECTED_EXCEPTION = "expected exception";
89     private static final String ACTOR = "my-actor";
90     private static final String OPERATION = "my-operation";
91     private static final String MY_SINK = "my-sink";
92     private static final String MY_SOURCE = "my-source";
93     private static final String MY_TARGET_ENTITY = "my-entity";
94     private static final String TEXT = "my-text";
95     private static final int TIMEOUT = 1000;
96     private static final UUID REQ_ID = UUID.randomUUID();
97
98     private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
99                     .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
100
101     /**
102      * Used to attach an appender to the class' logger.
103      */
104     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105     private static final ExtractAppender appender = new ExtractAppender();
106
107     private static final List<String> PROP_NAMES = List.of("hello", "world");
108
109     @Mock
110     private ActorService service;
111     @Mock
112     private Actor guardActor;
113     @Mock
114     private Operator guardOperator;
115     @Mock
116     private Operation guardOperation;
117
118     private PseudoExecutor executor;
119     private ControlLoopOperationParams params;
120
121     private MyOper oper;
122
123     private int numStart;
124     private int numEnd;
125
126     private Instant tstart;
127
128     private OperationOutcome opstart;
129     private OperationOutcome opend;
130
131     private Deque<OperationOutcome> starts;
132     private Deque<OperationOutcome> ends;
133
134     private OperatorConfig config;
135
136     /**
137      * Attaches the appender to the logger.
138      */
139     @BeforeClass
140     public static void setUpBeforeClass() throws Exception {
141         /*
142          * Attach appender to the logger.
143          */
144         appender.setContext(logger.getLoggerContext());
145         appender.start();
146
147         logger.addAppender(appender);
148     }
149
150     /**
151      * Stops the appender.
152      */
153     @AfterClass
154     public static void tearDownAfterClass() {
155         appender.stop();
156     }
157
158     /**
159      * Initializes the fields, including {@link #oper}.
160      */
161     @Before
162     public void setUp() {
163         MockitoAnnotations.initMocks(this);
164         executor = new PseudoExecutor();
165
166         params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
167                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
168                         .startCallback(this::starter).build();
169
170         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
171         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
172         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
173         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
174
175         config = new OperatorConfig(executor);
176
177         oper = new MyOper();
178
179         tstart = null;
180
181         opstart = null;
182         opend = null;
183
184         starts = new ArrayDeque<>(10);
185         ends = new ArrayDeque<>(10);
186     }
187
188     @Test
189     public void testOperatorPartial_testGetActorName_testGetName() {
190         assertEquals(ACTOR, oper.getActorName());
191         assertEquals(OPERATION, oper.getName());
192         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
193     }
194
195     @Test
196     public void testGetBlockingThread() throws Exception {
197         CompletableFuture<Void> future = new CompletableFuture<>();
198
199         // use the real executor
200         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
201             @Override
202             public Operation buildOperation(ControlLoopOperationParams params) {
203                 return null;
204             }
205         };
206
207         oper2.getBlockingExecutor().execute(() -> future.complete(null));
208
209         assertNull(future.get(5, TimeUnit.SECONDS));
210     }
211
212     @Test
213     public void testGetPropertyNames() {
214         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
215     }
216
217     @Test
218     public void testGetProperty_testSetProperty_testGetRequiredProperty() {
219         oper.setProperty("propertyA", "valueA");
220         oper.setProperty("propertyB", "valueB");
221         oper.setProperty("propertyC", 20);
222         oper.setProperty("propertyD", "valueD");
223
224         assertEquals("valueA", oper.getProperty("propertyA"));
225         assertEquals("valueB", oper.getProperty("propertyB"));
226         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
227
228         assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
229
230         assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
231                         .withMessage("missing some type");
232     }
233
234     @Test
235     public void testStart() {
236         verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
237     }
238
239     /**
240      * Tests start() with multiple running requests.
241      */
242     @Test
243     public void testStartMultiple() {
244         for (int count = 0; count < MAX_PARALLEL; ++count) {
245             oper.start();
246         }
247
248         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
249
250         assertNotNull(opstart);
251         assertNotNull(opend);
252         assertEquals(OperationResult.SUCCESS, opend.getResult());
253
254         assertEquals(MAX_PARALLEL, numStart);
255         assertEquals(MAX_PARALLEL, oper.getCount());
256         assertEquals(MAX_PARALLEL, numEnd);
257     }
258
259     @Test
260     public void testStartOperationAsync() {
261         oper.start();
262         assertTrue(executor.runAll(MAX_REQUESTS));
263
264         assertEquals(1, oper.getCount());
265     }
266
267     @Test
268     public void testIsSuccess() {
269         assertFalse(oper.isSuccess(null));
270
271         OperationOutcome outcome = new OperationOutcome();
272
273         outcome.setResult(OperationResult.SUCCESS);
274         assertTrue(oper.isSuccess(outcome));
275
276         for (OperationResult failure : FAILURE_RESULTS) {
277             outcome.setResult(failure);
278             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
279         }
280     }
281
282     @Test
283     public void testIsActorFailed() {
284         assertFalse(oper.isActorFailed(null));
285
286         OperationOutcome outcome = params.makeOutcome();
287
288         // incorrect outcome
289         outcome.setResult(OperationResult.SUCCESS);
290         assertFalse(oper.isActorFailed(outcome));
291
292         outcome.setResult(OperationResult.FAILURE_RETRIES);
293         assertFalse(oper.isActorFailed(outcome));
294
295         // correct outcome
296         outcome.setResult(OperationResult.FAILURE);
297
298         // incorrect actor
299         outcome.setActor(MY_SINK);
300         assertFalse(oper.isActorFailed(outcome));
301         outcome.setActor(null);
302         assertFalse(oper.isActorFailed(outcome));
303         outcome.setActor(ACTOR);
304
305         // incorrect operation
306         outcome.setOperation(MY_SINK);
307         assertFalse(oper.isActorFailed(outcome));
308         outcome.setOperation(null);
309         assertFalse(oper.isActorFailed(outcome));
310         outcome.setOperation(OPERATION);
311
312         // correct values
313         assertTrue(oper.isActorFailed(outcome));
314     }
315
316     @Test
317     public void testDoOperation() {
318         /*
319          * Use an operation that doesn't override doOperation().
320          */
321         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
322
323         oper2.start();
324         assertTrue(executor.runAll(MAX_REQUESTS));
325
326         assertNotNull(opend);
327         assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
328     }
329
330     @Test
331     public void testTimeout() throws Exception {
332
333         // use a real executor
334         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
335
336         // trigger timeout very quickly
337         oper = new MyOper() {
338             @Override
339             protected long getTimeoutMs(Integer timeoutSec) {
340                 return 1;
341             }
342
343             @Override
344             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
345
346                 OperationOutcome outcome2 = params.makeOutcome();
347                 outcome2.setResult(OperationResult.SUCCESS);
348
349                 /*
350                  * Create an incomplete future that will timeout after the operation's
351                  * timeout. If it fires before the other timer, then it will return a
352                  * SUCCESS outcome.
353                  */
354                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
355                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
356                                 params.getExecutor());
357
358                 return future;
359             }
360         };
361
362         assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
363     }
364
365     /**
366      * Tests retry functions, when the count is set to zero and retries are exhausted.
367      */
368     @Test
369     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
370         params = params.toBuilder().retry(0).build();
371
372         // new params, thus need a new operation
373         oper = new MyOper();
374
375         oper.setMaxFailures(10);
376
377         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
378     }
379
380     /**
381      * Tests retry functions, when the count is null and retries are exhausted.
382      */
383     @Test
384     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
385         params = params.toBuilder().retry(null).build();
386
387         // new params, thus need a new operation
388         oper = new MyOper();
389
390         oper.setMaxFailures(10);
391
392         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
393     }
394
395     /**
396      * Tests retry functions, when retries are exhausted.
397      */
398     @Test
399     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
400         final int maxRetries = 3;
401         params = params.toBuilder().retry(maxRetries).build();
402
403         // new params, thus need a new operation
404         oper = new MyOper();
405
406         oper.setMaxFailures(10);
407
408         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
409                         OperationResult.FAILURE_RETRIES);
410     }
411
412     /**
413      * Tests retry functions, when a success follows some retries.
414      */
415     @Test
416     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
417         params = params.toBuilder().retry(10).build();
418
419         // new params, thus need a new operation
420         oper = new MyOper();
421
422         final int maxFailures = 3;
423         oper.setMaxFailures(maxFailures);
424
425         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
426                         OperationResult.SUCCESS);
427     }
428
429     /**
430      * Tests retry functions, when the outcome is {@code null}.
431      */
432     @Test
433     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
434
435         // arrange to return null from doOperation()
436         oper = new MyOper() {
437             @Override
438             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
439
440                 // update counters
441                 super.doOperation(attempt, outcome);
442                 return null;
443             }
444         };
445
446         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
447     }
448
449     @Test
450     public void testSleep() throws Exception {
451         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
452         assertTrue(future.isDone());
453         assertNull(future.get());
454
455         // edge case
456         future = oper.sleep(0, TimeUnit.SECONDS);
457         assertTrue(future.isDone());
458         assertNull(future.get());
459
460         /*
461          * Start a second sleep we can use to check the first while it's running.
462          */
463         tstart = Instant.now();
464         future = oper.sleep(100, TimeUnit.MILLISECONDS);
465
466         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
467
468         // wait for second to complete and verify that the first has not completed
469         future2.get();
470         assertFalse(future.isDone());
471
472         // wait for second to complete
473         future.get();
474
475         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
476         assertTrue(diff >= 99);
477     }
478
479     @Test
480     public void testIsSameOperation() {
481         assertFalse(oper.isSameOperation(null));
482
483         OperationOutcome outcome = params.makeOutcome();
484
485         // wrong actor - should be false
486         outcome.setActor(null);
487         assertFalse(oper.isSameOperation(outcome));
488         outcome.setActor(MY_SINK);
489         assertFalse(oper.isSameOperation(outcome));
490         outcome.setActor(ACTOR);
491
492         // wrong operation - should be null
493         outcome.setOperation(null);
494         assertFalse(oper.isSameOperation(outcome));
495         outcome.setOperation(MY_SINK);
496         assertFalse(oper.isSameOperation(outcome));
497         outcome.setOperation(OPERATION);
498
499         assertTrue(oper.isSameOperation(outcome));
500     }
501
502     @Test
503     public void testFromException() {
504         // arrange to generate an exception when operation runs
505         oper.setGenException(true);
506
507         verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
508     }
509
510     /**
511      * Tests fromException() when there is no exception.
512      */
513     @Test
514     public void testFromExceptionNoExcept() {
515         verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
516     }
517
518     /**
519      * Tests both flavors of anyOf(), because one invokes the other.
520      */
521     @Test
522     public void testAnyOf() throws Exception {
523         // first task completes, others do not
524         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
525
526         final OperationOutcome outcome = params.makeOutcome();
527
528         tasks.add(() -> CompletableFuture.completedFuture(outcome));
529         tasks.add(() -> new CompletableFuture<>());
530         tasks.add(() -> null);
531         tasks.add(() -> new CompletableFuture<>());
532
533         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
534         assertTrue(executor.runAll(MAX_REQUESTS));
535         assertTrue(result.isDone());
536         assertSame(outcome, result.get());
537
538         // repeat using array form
539         @SuppressWarnings("unchecked")
540         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
541         result = oper.anyOf(tasks.toArray(taskArray));
542         assertTrue(executor.runAll(MAX_REQUESTS));
543         assertTrue(result.isDone());
544         assertSame(outcome, result.get());
545
546         // second task completes, others do not
547         tasks.clear();
548         tasks.add(() -> new CompletableFuture<>());
549         tasks.add(() -> CompletableFuture.completedFuture(outcome));
550         tasks.add(() -> new CompletableFuture<>());
551
552         result = oper.anyOf(tasks);
553         assertTrue(executor.runAll(MAX_REQUESTS));
554         assertTrue(result.isDone());
555         assertSame(outcome, result.get());
556
557         // third task completes, others do not
558         tasks.clear();
559         tasks.add(() -> new CompletableFuture<>());
560         tasks.add(() -> new CompletableFuture<>());
561         tasks.add(() -> CompletableFuture.completedFuture(outcome));
562
563         result = oper.anyOf(tasks);
564         assertTrue(executor.runAll(MAX_REQUESTS));
565         assertTrue(result.isDone());
566         assertSame(outcome, result.get());
567     }
568
569     /**
570      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
571      */
572     @Test
573     @SuppressWarnings("unchecked")
574     public void testAnyOfEdge() throws Exception {
575         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
576
577         // zero items: check both using a list and using an array
578         assertNull(oper.anyOf(tasks));
579         assertNull(oper.anyOf());
580
581         // one item: : check both using a list and using an array
582         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
583         tasks.add(() -> future1);
584
585         assertSame(future1, oper.anyOf(tasks));
586         assertSame(future1, oper.anyOf(() -> future1));
587     }
588
589     @Test
590     public void testAllOfArray() throws Exception {
591         final OperationOutcome outcome = params.makeOutcome();
592
593         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
594         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
595         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
596
597         @SuppressWarnings("unchecked")
598         CompletableFuture<OperationOutcome> result =
599                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
600
601         assertTrue(executor.runAll(MAX_REQUESTS));
602         assertFalse(result.isDone());
603         future1.complete(outcome);
604
605         // complete 3 before 2
606         assertTrue(executor.runAll(MAX_REQUESTS));
607         assertFalse(result.isDone());
608         future3.complete(outcome);
609
610         assertTrue(executor.runAll(MAX_REQUESTS));
611         assertFalse(result.isDone());
612         future2.complete(outcome);
613
614         // all of them are now done
615         assertTrue(executor.runAll(MAX_REQUESTS));
616         assertTrue(result.isDone());
617         assertSame(outcome, result.get());
618     }
619
620     @Test
621     public void testAllOfList() throws Exception {
622         final OperationOutcome outcome = params.makeOutcome();
623
624         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
625         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
626         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
627
628         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
629         tasks.add(() -> future1);
630         tasks.add(() -> future2);
631         tasks.add(() -> null);
632         tasks.add(() -> future3);
633
634         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
635
636         assertTrue(executor.runAll(MAX_REQUESTS));
637         assertFalse(result.isDone());
638         future1.complete(outcome);
639
640         // complete 3 before 2
641         assertTrue(executor.runAll(MAX_REQUESTS));
642         assertFalse(result.isDone());
643         future3.complete(outcome);
644
645         assertTrue(executor.runAll(MAX_REQUESTS));
646         assertFalse(result.isDone());
647         future2.complete(outcome);
648
649         // all of them are now done
650         assertTrue(executor.runAll(MAX_REQUESTS));
651         assertTrue(result.isDone());
652         assertSame(outcome, result.get());
653     }
654
655     /**
656      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
657      */
658     @Test
659     @SuppressWarnings("unchecked")
660     public void testAllOfEdge() throws Exception {
661         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
662
663         // zero items: check both using a list and using an array
664         assertNull(oper.allOf(tasks));
665         assertNull(oper.allOf());
666
667         // one item: : check both using a list and using an array
668         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
669         tasks.add(() -> future1);
670
671         assertSame(future1, oper.allOf(tasks));
672         assertSame(future1, oper.allOf(() -> future1));
673     }
674
675     @Test
676     public void testAttachFutures() throws Exception {
677         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
678
679         // third task throws an exception during construction
680         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
681         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
682         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
683         tasks.add(() -> future1);
684         tasks.add(() -> future2);
685         tasks.add(() -> {
686             throw new IllegalStateException(EXPECTED_EXCEPTION);
687         });
688         tasks.add(() -> future3);
689
690         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
691
692         // should have canceled the first two, but not the last
693         assertTrue(future1.isCancelled());
694         assertTrue(future2.isCancelled());
695         assertFalse(future3.isCancelled());
696     }
697
698     @Test
699     public void testCombineOutcomes() throws Exception {
700         // only one outcome
701         verifyOutcomes(0, OperationResult.SUCCESS);
702         verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
703
704         // maximum is in different positions
705         verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
706         verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
707         verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
708
709         // null outcome - takes precedence over a success
710         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
711         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
712         tasks.add(() -> CompletableFuture.completedFuture(null));
713         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
714         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
715
716         assertTrue(executor.runAll(MAX_REQUESTS));
717         assertTrue(result.isDone());
718         assertNull(result.get());
719
720         // one throws an exception during execution
721         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
722
723         tasks.clear();
724         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
725         tasks.add(() -> CompletableFuture.failedFuture(except));
726         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
727         result = oper.allOf(tasks);
728
729         assertTrue(executor.runAll(MAX_REQUESTS));
730         assertTrue(result.isCompletedExceptionally());
731         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
732     }
733
734     /**
735      * Tests both flavors of sequence(), because one invokes the other.
736      */
737     @Test
738     public void testSequence() throws Exception {
739         final OperationOutcome outcome = params.makeOutcome();
740
741         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
742         tasks.add(() -> CompletableFuture.completedFuture(outcome));
743         tasks.add(() -> null);
744         tasks.add(() -> CompletableFuture.completedFuture(outcome));
745         tasks.add(() -> CompletableFuture.completedFuture(outcome));
746
747         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
748         assertTrue(executor.runAll(MAX_REQUESTS));
749         assertTrue(result.isDone());
750         assertSame(outcome, result.get());
751
752         // repeat using array form
753         @SuppressWarnings("unchecked")
754         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
755         result = oper.sequence(tasks.toArray(taskArray));
756         assertTrue(executor.runAll(MAX_REQUESTS));
757         assertTrue(result.isDone());
758         assertSame(outcome, result.get());
759
760         // second task fails, third should not run
761         OperationOutcome failure = params.makeOutcome();
762         failure.setResult(OperationResult.FAILURE);
763         tasks.clear();
764         tasks.add(() -> CompletableFuture.completedFuture(outcome));
765         tasks.add(() -> CompletableFuture.completedFuture(failure));
766         tasks.add(() -> CompletableFuture.completedFuture(outcome));
767
768         result = oper.sequence(tasks);
769         assertTrue(executor.runAll(MAX_REQUESTS));
770         assertTrue(result.isDone());
771         assertSame(failure, result.get());
772     }
773
774     /**
775      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
776      */
777     @Test
778     @SuppressWarnings("unchecked")
779     public void testSequenceEdge() throws Exception {
780         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
781
782         // zero items: check both using a list and using an array
783         assertNull(oper.sequence(tasks));
784         assertNull(oper.sequence());
785
786         // one item: : check both using a list and using an array
787         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
788         tasks.add(() -> future1);
789
790         assertSame(future1, oper.sequence(tasks));
791         assertSame(future1, oper.sequence(() -> future1));
792     }
793
794     private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
795         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
796
797         OperationOutcome expectedOutcome = null;
798
799         for (int count = 0; count < results.length; ++count) {
800             OperationOutcome outcome = params.makeOutcome();
801             outcome.setResult(results[count]);
802             tasks.add(() -> CompletableFuture.completedFuture(outcome));
803
804             if (count == expected) {
805                 expectedOutcome = outcome;
806             }
807         }
808
809         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
810
811         assertTrue(executor.runAll(MAX_REQUESTS));
812         assertTrue(result.isDone());
813         assertSame(expectedOutcome, result.get());
814     }
815
816     @Test
817     public void testDetmPriority() throws CoderException {
818         assertEquals(1, oper.detmPriority(null));
819
820         OperationOutcome outcome = params.makeOutcome();
821
822         Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
823                 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
824                 OperationResult.FAILURE_EXCEPTION, 6);
825
826         for (Entry<OperationResult, Integer> ent : map.entrySet()) {
827             outcome.setResult(ent.getKey());
828             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
829         }
830
831         /*
832          * Test null result. We can't actually set it to null, because the set() method
833          * won't allow it. Instead, we decode it from a structure.
834          */
835         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
836         assertEquals(1, oper.detmPriority(outcome));
837     }
838
839     /**
840      * Tests callbackStarted() when the pipeline has already been stopped.
841      */
842     @Test
843     public void testCallbackStartedNotRunning() {
844         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
845
846         /*
847          * arrange to stop the controller when the start-callback is invoked, but capture
848          * the outcome
849          */
850         params = params.toBuilder().startCallback(oper -> {
851             starter(oper);
852             future.get().cancel(false);
853         }).build();
854
855         // new params, thus need a new operation
856         oper = new MyOper();
857
858         future.set(oper.start());
859         assertTrue(executor.runAll(MAX_REQUESTS));
860
861         // should have only run once
862         assertEquals(1, numStart);
863     }
864
865     /**
866      * Tests callbackCompleted() when the pipeline has already been stopped.
867      */
868     @Test
869     public void testCallbackCompletedNotRunning() {
870         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
871
872         // arrange to stop the controller when the start-callback is invoked
873         params = params.toBuilder().startCallback(oper -> {
874             future.get().cancel(false);
875         }).build();
876
877         // new params, thus need a new operation
878         oper = new MyOper();
879
880         future.set(oper.start());
881         assertTrue(executor.runAll(MAX_REQUESTS));
882
883         // should not have been set
884         assertNull(opend);
885         assertEquals(0, numEnd);
886     }
887
888     @Test
889     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
890         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
891
892         OperationOutcome outcome;
893
894         outcome = new OperationOutcome();
895         oper.setOutcome(outcome, timex);
896         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
897         assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
898
899         outcome = new OperationOutcome();
900         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
901         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
902         assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
903     }
904
905     @Test
906     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
907         OperationOutcome outcome;
908
909         outcome = new OperationOutcome();
910         oper.setOutcome(outcome, OperationResult.SUCCESS);
911         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
912         assertEquals(OperationResult.SUCCESS, outcome.getResult());
913
914         oper.setOutcome(outcome, OperationResult.SUCCESS);
915         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
916         assertEquals(OperationResult.SUCCESS, outcome.getResult());
917
918         for (OperationResult result : FAILURE_RESULTS) {
919             outcome = new OperationOutcome();
920             oper.setOutcome(outcome, result);
921             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
922             assertEquals(result.toString(), result, outcome.getResult());
923         }
924     }
925
926     @Test
927     public void testMakeOutcome() {
928         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
929         assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
930     }
931
932     @Test
933     public void testIsTimeout() {
934         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
935
936         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
937         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
938         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
939         assertFalse(oper.isTimeout(new CompletionException(null)));
940         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
941
942         assertTrue(oper.isTimeout(timex));
943         assertTrue(oper.isTimeout(new CompletionException(timex)));
944     }
945
946     @Test
947     public void testLogMessage() {
948         final String infraStr = SINK_INFRA.toString();
949
950         // log structured data
951         appender.clearExtractions();
952         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
953         List<String> output = appender.getExtracted();
954         assertEquals(1, output.size());
955
956         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
957                         .contains("{\n  \"text\": \"my-text\"\n}");
958
959         // repeat with a response
960         appender.clearExtractions();
961         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
962         output = appender.getExtracted();
963         assertEquals(1, output.size());
964
965         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
966                         .contains("{\n  \"text\": \"my-text\"\n}");
967
968         // log a plain string
969         appender.clearExtractions();
970         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
971         output = appender.getExtracted();
972         assertEquals(1, output.size());
973         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
974
975         // log a null request
976         appender.clearExtractions();
977         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
978         output = appender.getExtracted();
979         assertEquals(1, output.size());
980
981         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
982
983         // generate exception from coder
984         setOperCoderException();
985
986         appender.clearExtractions();
987         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
988         output = appender.getExtracted();
989         assertEquals(2, output.size());
990         assertThat(output.get(0)).contains("cannot pretty-print request");
991         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
992
993         // repeat with a response
994         appender.clearExtractions();
995         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
996         output = appender.getExtracted();
997         assertEquals(2, output.size());
998         assertThat(output.get(0)).contains("cannot pretty-print response");
999         assertThat(output.get(1)).contains(MY_SOURCE);
1000     }
1001
1002     @Test
1003     public void testGetRetry() {
1004         assertEquals(0, oper.getRetry(null));
1005         assertEquals(10, oper.getRetry(10));
1006     }
1007
1008     @Test
1009     public void testGetRetryWait() {
1010         // need an operator that doesn't override the retry time
1011         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1012         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1013     }
1014
1015     @Test
1016     public void testGetTimeOutMs() {
1017         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1018
1019         params = params.toBuilder().timeoutSec(null).build();
1020
1021         // new params, thus need a new operation
1022         oper = new MyOper();
1023
1024         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1025     }
1026
1027     private void starter(OperationOutcome oper) {
1028         ++numStart;
1029         tstart = oper.getStart();
1030         opstart = oper;
1031         starts.add(oper);
1032     }
1033
1034     private void completer(OperationOutcome oper) {
1035         ++numEnd;
1036         opend = oper;
1037         ends.add(oper);
1038     }
1039
1040     /**
1041      * Gets a function that does nothing.
1042      *
1043      * @param <T> type of input parameter expected by the function
1044      * @return a function that does nothing
1045      */
1046     private <T> Consumer<T> noop() {
1047         return unused -> {
1048         };
1049     }
1050
1051     private OperationOutcome makeSuccess() {
1052         OperationOutcome outcome = params.makeOutcome();
1053         outcome.setResult(OperationResult.SUCCESS);
1054
1055         return outcome;
1056     }
1057
1058     /**
1059      * Verifies a run.
1060      *
1061      * @param testName test name
1062      * @param expectedCallbacks number of callbacks expected
1063      * @param expectedOperations number of operation invocations expected
1064      * @param expectedResult expected outcome
1065      */
1066     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1067             OperationResult expectedResult) {
1068
1069         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1070     }
1071
1072     /**
1073      * Verifies a run.
1074      *
1075      * @param testName test name
1076      * @param expectedCallbacks number of callbacks expected
1077      * @param expectedOperations number of operation invocations expected
1078      * @param expectedResult expected outcome
1079      * @param manipulator function to modify the future returned by
1080      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1081      *        in the executor are run
1082      */
1083     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1084             OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1085
1086         tstart = null;
1087         opstart = null;
1088         opend = null;
1089         starts.clear();
1090         ends.clear();
1091
1092         CompletableFuture<OperationOutcome> future = oper.start();
1093
1094         manipulator.accept(future);
1095
1096         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1097
1098         assertEquals(testName, expectedCallbacks, numStart);
1099         assertEquals(testName, expectedCallbacks, numEnd);
1100
1101         if (expectedCallbacks > 0) {
1102             assertNotNull(testName, opstart);
1103             assertNotNull(testName, opend);
1104             assertEquals(testName, expectedResult, opend.getResult());
1105
1106             assertSame(testName, tstart, opstart.getStart());
1107             assertSame(testName, tstart, opend.getStart());
1108
1109             try {
1110                 assertTrue(future.isDone());
1111                 assertEquals(testName, opend, future.get());
1112
1113                 // "start" is never final
1114                 for (OperationOutcome outcome : starts) {
1115                     assertFalse(testName, outcome.isFinalOutcome());
1116                 }
1117
1118                 // only the last "complete" is final
1119                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1120
1121                 for (OperationOutcome outcome : ends) {
1122                     assertFalse(outcome.isFinalOutcome());
1123                 }
1124
1125             } catch (InterruptedException | ExecutionException e) {
1126                 throw new IllegalStateException(e);
1127             }
1128
1129             if (expectedOperations > 0) {
1130                 assertNotNull(testName, oper.getSubRequestId());
1131                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1132                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1133             }
1134         }
1135
1136         assertEquals(testName, expectedOperations, oper.getCount());
1137     }
1138
1139     /**
1140      * Creates a new {@link #oper} whose coder will throw an exception.
1141      */
1142     private void setOperCoderException() {
1143         oper = new MyOper() {
1144             @Override
1145             protected Coder getCoder() {
1146                 return new StandardCoder() {
1147                     @Override
1148                     public String encode(Object object, boolean pretty) throws CoderException {
1149                         throw new CoderException(EXPECTED_EXCEPTION);
1150                     }
1151                 };
1152             }
1153         };
1154     }
1155
1156
1157     @Getter
1158     public static class MyData {
1159         private String text = TEXT;
1160     }
1161
1162
1163     private class MyOper extends OperationPartial {
1164         @Getter
1165         private int count = 0;
1166
1167         @Setter
1168         private boolean genException;
1169         @Setter
1170         private int maxFailures = 0;
1171         @Setter
1172         private CompletableFuture<OperationOutcome> preProc;
1173
1174
1175         public MyOper() {
1176             super(OperationPartialTest.this.params, config, PROP_NAMES);
1177         }
1178
1179         @Override
1180         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1181             ++count;
1182             if (genException) {
1183                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1184             }
1185
1186             operation.setSubRequestId(String.valueOf(attempt));
1187
1188             if (count > maxFailures) {
1189                 operation.setResult(OperationResult.SUCCESS);
1190             } else {
1191                 operation.setResult(OperationResult.FAILURE);
1192             }
1193
1194             return operation;
1195         }
1196
1197         @Override
1198         protected long getRetryWaitMs() {
1199             /*
1200              * Sleep timers run in the background, but we want to control things via the
1201              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1202              */
1203             return 0L;
1204         }
1205     }
1206 }