Change payload to Map<String,Object> so it's more versatile
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.Arrays;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Map.Entry;
42 import java.util.UUID;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.CompletionException;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import java.util.stream.Collectors;
54 import lombok.Getter;
55 import lombok.Setter;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.mockito.ArgumentCaptor;
61 import org.mockito.Mock;
62 import org.mockito.MockitoAnnotations;
63 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
64 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
65 import org.onap.policy.common.utils.coder.Coder;
66 import org.onap.policy.common.utils.coder.CoderException;
67 import org.onap.policy.common.utils.coder.StandardCoder;
68 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
69 import org.onap.policy.common.utils.time.PseudoExecutor;
70 import org.onap.policy.controlloop.ControlLoopOperation;
71 import org.onap.policy.controlloop.VirtualControlLoopEvent;
72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
75 import org.onap.policy.controlloop.actorserviceprovider.Operator;
76 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
77 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
79 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
80 import org.onap.policy.controlloop.policy.PolicyResult;
81 import org.slf4j.LoggerFactory;
82
83 public class OperationPartialTest {
84     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86     private static final int MAX_REQUESTS = 100;
87     private static final int MAX_PARALLEL = 10;
88     private static final String EXPECTED_EXCEPTION = "expected exception";
89     private static final String ACTOR = "my-actor";
90     private static final String OPERATION = "my-operation";
91     private static final String MY_SINK = "my-sink";
92     private static final String MY_SOURCE = "my-source";
93     private static final String MY_TARGET_ENTITY = "my-entity";
94     private static final String TEXT = "my-text";
95     private static final int TIMEOUT = 1000;
96     private static final UUID REQ_ID = UUID.randomUUID();
97
98     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
99                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
100
101     /**
102      * Used to attach an appender to the class' logger.
103      */
104     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105     private static final ExtractAppender appender = new ExtractAppender();
106
107     @Mock
108     private ActorService service;
109     @Mock
110     private Actor guardActor;
111     @Mock
112     private Operator guardOperator;
113     @Mock
114     private Operation guardOperation;
115
116     private VirtualControlLoopEvent event;
117     private ControlLoopEventContext context;
118     private PseudoExecutor executor;
119     private ControlLoopOperationParams params;
120
121     private MyOper oper;
122
123     private int numStart;
124     private int numEnd;
125
126     private Instant tstart;
127
128     private OperationOutcome opstart;
129     private OperationOutcome opend;
130
131     private OperatorConfig config;
132
133     /**
134      * Attaches the appender to the logger.
135      */
136     @BeforeClass
137     public static void setUpBeforeClass() throws Exception {
138         /**
139          * Attach appender to the logger.
140          */
141         appender.setContext(logger.getLoggerContext());
142         appender.start();
143
144         logger.addAppender(appender);
145     }
146
147     /**
148      * Stops the appender.
149      */
150     @AfterClass
151     public static void tearDownAfterClass() {
152         appender.stop();
153     }
154
155     /**
156      * Initializes the fields, including {@link #oper}.
157      */
158     @Before
159     public void setUp() {
160         MockitoAnnotations.initMocks(this);
161
162         event = new VirtualControlLoopEvent();
163         event.setRequestId(REQ_ID);
164
165         context = new ControlLoopEventContext(event);
166         executor = new PseudoExecutor();
167
168         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
169                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
170                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
171
172         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
173         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
174         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
175         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
176
177         config = new OperatorConfig(executor);
178
179         oper = new MyOper();
180
181         tstart = null;
182
183         opstart = null;
184         opend = null;
185     }
186
187     @Test
188     public void testOperatorPartial_testGetActorName_testGetName() {
189         assertEquals(ACTOR, oper.getActorName());
190         assertEquals(OPERATION, oper.getName());
191         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
192     }
193
194     @Test
195     public void testGetBlockingThread() throws Exception {
196         CompletableFuture<Void> future = new CompletableFuture<>();
197
198         // use the real executor
199         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
200             @Override
201             public Operation buildOperation(ControlLoopOperationParams params) {
202                 return null;
203             }
204         };
205
206         oper2.getBlockingExecutor().execute(() -> future.complete(null));
207
208         assertNull(future.get(5, TimeUnit.SECONDS));
209     }
210
211     @Test
212     public void testStart() {
213         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
214     }
215
216     /**
217      * Tests start() with multiple running requests.
218      */
219     @Test
220     public void testStartMultiple() {
221         for (int count = 0; count < MAX_PARALLEL; ++count) {
222             oper.start();
223         }
224
225         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
226
227         assertNotNull(opstart);
228         assertNotNull(opend);
229         assertEquals(PolicyResult.SUCCESS, opend.getResult());
230
231         assertEquals(MAX_PARALLEL, numStart);
232         assertEquals(MAX_PARALLEL, oper.getCount());
233         assertEquals(MAX_PARALLEL, numEnd);
234     }
235
236     /**
237      * Tests startPreprocessor() when the preprocessor returns a failure.
238      */
239     @Test
240     public void testStartPreprocessorFailure() {
241         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
242
243         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
244     }
245
246     /**
247      * Tests startPreprocessor() when the preprocessor throws an exception.
248      */
249     @Test
250     public void testStartPreprocessorException() {
251         // arrange for the preprocessor to throw an exception
252         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
253
254         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
255     }
256
257     /**
258      * Tests startPreprocessor() when the pipeline is not running.
259      */
260     @Test
261     public void testStartPreprocessorNotRunning() {
262         // arrange for the preprocessor to return success, which will be ignored
263         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
264
265         oper.start().cancel(false);
266         assertTrue(executor.runAll(MAX_REQUESTS));
267
268         assertNull(opstart);
269         assertNull(opend);
270
271         assertEquals(0, numStart);
272         assertEquals(0, oper.getCount());
273         assertEquals(0, numEnd);
274     }
275
276     /**
277      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
278      */
279     @Test
280     public void testStartPreprocessorBuilderException() {
281         oper = new MyOper() {
282             @Override
283             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
284                 throw new IllegalStateException(EXPECTED_EXCEPTION);
285             }
286         };
287
288         assertThatIllegalStateException().isThrownBy(() -> oper.start());
289
290         // should be nothing in the queue
291         assertEquals(0, executor.getQueueLength());
292     }
293
294     @Test
295     public void testStartPreprocessorAsync() {
296         assertNull(oper.startPreprocessorAsync());
297     }
298
299     @Test
300     public void testStartGuardAsync() throws Exception {
301         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
302         assertTrue(future.isDone());
303         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
304
305         // verify the parameters that were passed
306         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
307                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
308         verify(guardOperator).buildOperation(paramsCaptor.capture());
309
310         params = paramsCaptor.getValue();
311         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
312         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
313         assertNull(params.getRetry());
314         assertNull(params.getTimeoutSec());
315
316         Map<String, Object> payload = params.getPayload();
317         assertNotNull(payload);
318
319         @SuppressWarnings("unchecked")
320         Map<String, Object> resource = (Map<String, Object>) payload.get("resource");
321         assertNotNull(resource);
322
323         @SuppressWarnings("unchecked")
324         Map<String, Object> guard = (Map<String, Object>) resource.get("guard");
325         assertEquals(oper.makeGuardPayload(), guard);
326     }
327
328     @Test
329     public void testMakeGuardPayload() {
330         Map<String, Object> payload = oper.makeGuardPayload();
331         assertSame(REQ_ID, payload.get("requestId"));
332
333         // request id changes, so remove it
334         payload.remove("requestId");
335
336         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity}", payload.toString());
337
338         // repeat, but with closed loop name
339         event.setClosedLoopControlName("my-loop");
340         payload = oper.makeGuardPayload();
341         payload.remove("requestId");
342         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity, clname=my-loop}", payload.toString());
343     }
344
345     @Test
346     public void testStartOperationAsync() {
347         oper.start();
348         assertTrue(executor.runAll(MAX_REQUESTS));
349
350         assertEquals(1, oper.getCount());
351     }
352
353     @Test
354     public void testIsSuccess() {
355         OperationOutcome outcome = new OperationOutcome();
356
357         outcome.setResult(PolicyResult.SUCCESS);
358         assertTrue(oper.isSuccess(outcome));
359
360         for (PolicyResult failure : FAILURE_RESULTS) {
361             outcome.setResult(failure);
362             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
363         }
364     }
365
366     @Test
367     public void testIsActorFailed() {
368         assertFalse(oper.isActorFailed(null));
369
370         OperationOutcome outcome = params.makeOutcome();
371
372         // incorrect outcome
373         outcome.setResult(PolicyResult.SUCCESS);
374         assertFalse(oper.isActorFailed(outcome));
375
376         outcome.setResult(PolicyResult.FAILURE_RETRIES);
377         assertFalse(oper.isActorFailed(outcome));
378
379         // correct outcome
380         outcome.setResult(PolicyResult.FAILURE);
381
382         // incorrect actor
383         outcome.setActor(MY_SINK);
384         assertFalse(oper.isActorFailed(outcome));
385         outcome.setActor(null);
386         assertFalse(oper.isActorFailed(outcome));
387         outcome.setActor(ACTOR);
388
389         // incorrect operation
390         outcome.setOperation(MY_SINK);
391         assertFalse(oper.isActorFailed(outcome));
392         outcome.setOperation(null);
393         assertFalse(oper.isActorFailed(outcome));
394         outcome.setOperation(OPERATION);
395
396         // correct values
397         assertTrue(oper.isActorFailed(outcome));
398     }
399
400     @Test
401     public void testDoOperation() {
402         /*
403          * Use an operation that doesn't override doOperation().
404          */
405         OperationPartial oper2 = new OperationPartial(params, config) {};
406
407         oper2.start();
408         assertTrue(executor.runAll(MAX_REQUESTS));
409
410         assertNotNull(opend);
411         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
412     }
413
414     @Test
415     public void testTimeout() throws Exception {
416
417         // use a real executor
418         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
419
420         // trigger timeout very quickly
421         oper = new MyOper() {
422             @Override
423             protected long getTimeoutMs(Integer timeoutSec) {
424                 return 1;
425             }
426
427             @Override
428             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
429
430                 OperationOutcome outcome2 = params.makeOutcome();
431                 outcome2.setResult(PolicyResult.SUCCESS);
432
433                 /*
434                  * Create an incomplete future that will timeout after the operation's
435                  * timeout. If it fires before the other timer, then it will return a
436                  * SUCCESS outcome.
437                  */
438                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
439                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
440                                 params.getExecutor());
441
442                 return future;
443             }
444         };
445
446         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
447     }
448
449     /**
450      * Tests retry functions, when the count is set to zero and retries are exhausted.
451      */
452     @Test
453     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
454         params = params.toBuilder().retry(0).build();
455
456         // new params, thus need a new operation
457         oper = new MyOper();
458
459         oper.setMaxFailures(10);
460
461         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
462     }
463
464     /**
465      * Tests retry functions, when the count is null and retries are exhausted.
466      */
467     @Test
468     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
469         params = params.toBuilder().retry(null).build();
470
471         // new params, thus need a new operation
472         oper = new MyOper();
473
474         oper.setMaxFailures(10);
475
476         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
477     }
478
479     /**
480      * Tests retry functions, when retries are exhausted.
481      */
482     @Test
483     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
484         final int maxRetries = 3;
485         params = params.toBuilder().retry(maxRetries).build();
486
487         // new params, thus need a new operation
488         oper = new MyOper();
489
490         oper.setMaxFailures(10);
491
492         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
493                         PolicyResult.FAILURE_RETRIES);
494     }
495
496     /**
497      * Tests retry functions, when a success follows some retries.
498      */
499     @Test
500     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
501         params = params.toBuilder().retry(10).build();
502
503         // new params, thus need a new operation
504         oper = new MyOper();
505
506         final int maxFailures = 3;
507         oper.setMaxFailures(maxFailures);
508
509         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
510                         PolicyResult.SUCCESS);
511     }
512
513     /**
514      * Tests retry functions, when the outcome is {@code null}.
515      */
516     @Test
517     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
518
519         // arrange to return null from doOperation()
520         oper = new MyOper() {
521             @Override
522             protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
523
524                 // update counters
525                 super.doOperation(attempt, operation);
526                 return null;
527             }
528         };
529
530         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
531     }
532
533     @Test
534     public void testSleep() throws Exception {
535         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
536         assertTrue(future.isDone());
537         assertNull(future.get());
538
539         // edge case
540         future = oper.sleep(0, TimeUnit.SECONDS);
541         assertTrue(future.isDone());
542         assertNull(future.get());
543
544         /*
545          * Start a second sleep we can use to check the first while it's running.
546          */
547         tstart = Instant.now();
548         future = oper.sleep(100, TimeUnit.MILLISECONDS);
549
550         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
551
552         // wait for second to complete and verify that the first has not completed
553         future2.get();
554         assertFalse(future.isDone());
555
556         // wait for second to complete
557         future.get();
558
559         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
560         assertTrue(diff >= 99);
561     }
562
563     @Test
564     public void testIsSameOperation() {
565         assertFalse(oper.isSameOperation(null));
566
567         OperationOutcome outcome = params.makeOutcome();
568
569         // wrong actor - should be false
570         outcome.setActor(null);
571         assertFalse(oper.isSameOperation(outcome));
572         outcome.setActor(MY_SINK);
573         assertFalse(oper.isSameOperation(outcome));
574         outcome.setActor(ACTOR);
575
576         // wrong operation - should be null
577         outcome.setOperation(null);
578         assertFalse(oper.isSameOperation(outcome));
579         outcome.setOperation(MY_SINK);
580         assertFalse(oper.isSameOperation(outcome));
581         outcome.setOperation(OPERATION);
582
583         assertTrue(oper.isSameOperation(outcome));
584     }
585
586     /**
587      * Tests handleFailure() when the outcome is a success.
588      */
589     @Test
590     public void testHandlePreprocessorFailureTrue() {
591         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
592         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
593     }
594
595     /**
596      * Tests handleFailure() when the outcome is <i>not</i> a success.
597      */
598     @Test
599     public void testHandlePreprocessorFailureFalse() throws Exception {
600         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
601         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
602     }
603
604     /**
605      * Tests handleFailure() when the outcome is {@code null}.
606      */
607     @Test
608     public void testHandlePreprocessorFailureNull() throws Exception {
609         // arrange to return a null outcome from the preprocessor
610         oper.setPreProc(CompletableFuture.completedFuture(null));
611         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
612     }
613
614     @Test
615     public void testFromException() {
616         // arrange to generate an exception when operation runs
617         oper.setGenException(true);
618
619         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
620     }
621
622     /**
623      * Tests fromException() when there is no exception.
624      */
625     @Test
626     public void testFromExceptionNoExcept() {
627         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
628     }
629
630     /**
631      * Tests both flavors of anyOf(), because one invokes the other.
632      */
633     @Test
634     public void testAnyOf() throws Exception {
635         // first task completes, others do not
636         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
637
638         final OperationOutcome outcome = params.makeOutcome();
639
640         tasks.add(() -> CompletableFuture.completedFuture(outcome));
641         tasks.add(() -> new CompletableFuture<>());
642         tasks.add(() -> null);
643         tasks.add(() -> new CompletableFuture<>());
644
645         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
646         assertTrue(executor.runAll(MAX_REQUESTS));
647         assertTrue(result.isDone());
648         assertSame(outcome, result.get());
649
650         // repeat using array form
651         @SuppressWarnings("unchecked")
652         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
653         result = oper.anyOf(tasks.toArray(taskArray));
654         assertTrue(executor.runAll(MAX_REQUESTS));
655         assertTrue(result.isDone());
656         assertSame(outcome, result.get());
657
658         // second task completes, others do not
659         tasks.clear();
660         tasks.add(() -> new CompletableFuture<>());
661         tasks.add(() -> CompletableFuture.completedFuture(outcome));
662         tasks.add(() -> new CompletableFuture<>());
663
664         result = oper.anyOf(tasks);
665         assertTrue(executor.runAll(MAX_REQUESTS));
666         assertTrue(result.isDone());
667         assertSame(outcome, result.get());
668
669         // third task completes, others do not
670         tasks.clear();
671         tasks.add(() -> new CompletableFuture<>());
672         tasks.add(() -> new CompletableFuture<>());
673         tasks.add(() -> CompletableFuture.completedFuture(outcome));
674
675         result = oper.anyOf(tasks);
676         assertTrue(executor.runAll(MAX_REQUESTS));
677         assertTrue(result.isDone());
678         assertSame(outcome, result.get());
679     }
680
681     /**
682      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
683      */
684     @Test
685     @SuppressWarnings("unchecked")
686     public void testAnyOfEdge() throws Exception {
687         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
688
689         // zero items: check both using a list and using an array
690         assertNull(oper.anyOf(tasks));
691         assertNull(oper.anyOf());
692
693         // one item: : check both using a list and using an array
694         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
695         tasks.add(() -> future1);
696
697         assertSame(future1, oper.anyOf(tasks));
698         assertSame(future1, oper.anyOf(() -> future1));
699     }
700
701     @Test
702     public void testAllOfArray() throws Exception {
703         final OperationOutcome outcome = params.makeOutcome();
704
705         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
706         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
707         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
708
709         @SuppressWarnings("unchecked")
710         CompletableFuture<OperationOutcome> result =
711                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
712
713         assertTrue(executor.runAll(MAX_REQUESTS));
714         assertFalse(result.isDone());
715         future1.complete(outcome);
716
717         // complete 3 before 2
718         assertTrue(executor.runAll(MAX_REQUESTS));
719         assertFalse(result.isDone());
720         future3.complete(outcome);
721
722         assertTrue(executor.runAll(MAX_REQUESTS));
723         assertFalse(result.isDone());
724         future2.complete(outcome);
725
726         // all of them are now done
727         assertTrue(executor.runAll(MAX_REQUESTS));
728         assertTrue(result.isDone());
729         assertSame(outcome, result.get());
730     }
731
732     @Test
733     public void testAllOfList() throws Exception {
734         final OperationOutcome outcome = params.makeOutcome();
735
736         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
737         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
738         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
739
740         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
741         tasks.add(() -> future1);
742         tasks.add(() -> future2);
743         tasks.add(() -> null);
744         tasks.add(() -> future3);
745
746         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
747
748         assertTrue(executor.runAll(MAX_REQUESTS));
749         assertFalse(result.isDone());
750         future1.complete(outcome);
751
752         // complete 3 before 2
753         assertTrue(executor.runAll(MAX_REQUESTS));
754         assertFalse(result.isDone());
755         future3.complete(outcome);
756
757         assertTrue(executor.runAll(MAX_REQUESTS));
758         assertFalse(result.isDone());
759         future2.complete(outcome);
760
761         // all of them are now done
762         assertTrue(executor.runAll(MAX_REQUESTS));
763         assertTrue(result.isDone());
764         assertSame(outcome, result.get());
765     }
766
767     /**
768      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
769      */
770     @Test
771     @SuppressWarnings("unchecked")
772     public void testAllOfEdge() throws Exception {
773         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
774
775         // zero items: check both using a list and using an array
776         assertNull(oper.allOf(tasks));
777         assertNull(oper.allOf());
778
779         // one item: : check both using a list and using an array
780         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
781         tasks.add(() -> future1);
782
783         assertSame(future1, oper.allOf(tasks));
784         assertSame(future1, oper.allOf(() -> future1));
785     }
786
787     @Test
788     public void testAttachFutures() throws Exception {
789         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
790
791         // third task throws an exception during construction
792         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
793         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
794         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
795         tasks.add(() -> future1);
796         tasks.add(() -> future2);
797         tasks.add(() -> {
798             throw new IllegalStateException(EXPECTED_EXCEPTION);
799         });
800         tasks.add(() -> future3);
801
802         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
803
804         // should have canceled the first two, but not the last
805         assertTrue(future1.isCancelled());
806         assertTrue(future2.isCancelled());
807         assertFalse(future3.isCancelled());
808     }
809
810     @Test
811     public void testCombineOutcomes() throws Exception {
812         // only one outcome
813         verifyOutcomes(0, PolicyResult.SUCCESS);
814         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
815
816         // maximum is in different positions
817         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
818         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
819         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
820
821         // null outcome - takes precedence over a success
822         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
823         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
824         tasks.add(() -> CompletableFuture.completedFuture(null));
825         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
826         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
827
828         assertTrue(executor.runAll(MAX_REQUESTS));
829         assertTrue(result.isDone());
830         assertNull(result.get());
831
832         // one throws an exception during execution
833         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
834
835         tasks.clear();
836         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
837         tasks.add(() -> CompletableFuture.failedFuture(except));
838         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
839         result = oper.allOf(tasks);
840
841         assertTrue(executor.runAll(MAX_REQUESTS));
842         assertTrue(result.isCompletedExceptionally());
843         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
844     }
845
846     /**
847      * Tests both flavors of sequence(), because one invokes the other.
848      */
849     @Test
850     public void testSequence() throws Exception {
851         final OperationOutcome outcome = params.makeOutcome();
852
853         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
854         tasks.add(() -> CompletableFuture.completedFuture(outcome));
855         tasks.add(() -> null);
856         tasks.add(() -> CompletableFuture.completedFuture(outcome));
857         tasks.add(() -> CompletableFuture.completedFuture(outcome));
858
859         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
860         assertTrue(executor.runAll(MAX_REQUESTS));
861         assertTrue(result.isDone());
862         assertSame(outcome, result.get());
863
864         // repeat using array form
865         @SuppressWarnings("unchecked")
866         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
867         result = oper.sequence(tasks.toArray(taskArray));
868         assertTrue(executor.runAll(MAX_REQUESTS));
869         assertTrue(result.isDone());
870         assertSame(outcome, result.get());
871
872         // second task fails, third should not run
873         OperationOutcome failure = params.makeOutcome();
874         failure.setResult(PolicyResult.FAILURE);
875         tasks.clear();
876         tasks.add(() -> CompletableFuture.completedFuture(outcome));
877         tasks.add(() -> CompletableFuture.completedFuture(failure));
878         tasks.add(() -> CompletableFuture.completedFuture(outcome));
879
880         result = oper.sequence(tasks);
881         assertTrue(executor.runAll(MAX_REQUESTS));
882         assertTrue(result.isDone());
883         assertSame(failure, result.get());
884     }
885
886     /**
887      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
888      */
889     @Test
890     @SuppressWarnings("unchecked")
891     public void testSequenceEdge() throws Exception {
892         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
893
894         // zero items: check both using a list and using an array
895         assertNull(oper.sequence(tasks));
896         assertNull(oper.sequence());
897
898         // one item: : check both using a list and using an array
899         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
900         tasks.add(() -> future1);
901
902         assertSame(future1, oper.sequence(tasks));
903         assertSame(future1, oper.sequence(() -> future1));
904     }
905
906     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
907         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
908
909         OperationOutcome expectedOutcome = null;
910
911         for (int count = 0; count < results.length; ++count) {
912             OperationOutcome outcome = params.makeOutcome();
913             outcome.setResult(results[count]);
914             tasks.add(() -> CompletableFuture.completedFuture(outcome));
915
916             if (count == expected) {
917                 expectedOutcome = outcome;
918             }
919         }
920
921         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
922
923         assertTrue(executor.runAll(MAX_REQUESTS));
924         assertTrue(result.isDone());
925         assertSame(expectedOutcome, result.get());
926     }
927
928     @Test
929     public void testDetmPriority() throws CoderException {
930         assertEquals(1, oper.detmPriority(null));
931
932         OperationOutcome outcome = params.makeOutcome();
933
934         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
935                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
936                         PolicyResult.FAILURE_EXCEPTION, 6);
937
938         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
939             outcome.setResult(ent.getKey());
940             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
941         }
942
943         /*
944          * Test null result. We can't actually set it to null, because the set() method
945          * won't allow it. Instead, we decode it from a structure.
946          */
947         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
948         assertEquals(1, oper.detmPriority(outcome));
949     }
950
951     /**
952      * Tests callbackStarted() when the pipeline has already been stopped.
953      */
954     @Test
955     public void testCallbackStartedNotRunning() {
956         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
957
958         /*
959          * arrange to stop the controller when the start-callback is invoked, but capture
960          * the outcome
961          */
962         params = params.toBuilder().startCallback(oper -> {
963             starter(oper);
964             future.get().cancel(false);
965         }).build();
966
967         // new params, thus need a new operation
968         oper = new MyOper();
969
970         future.set(oper.start());
971         assertTrue(executor.runAll(MAX_REQUESTS));
972
973         // should have only run once
974         assertEquals(1, numStart);
975     }
976
977     /**
978      * Tests callbackCompleted() when the pipeline has already been stopped.
979      */
980     @Test
981     public void testCallbackCompletedNotRunning() {
982         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
983
984         // arrange to stop the controller when the start-callback is invoked
985         params = params.toBuilder().startCallback(oper -> {
986             future.get().cancel(false);
987         }).build();
988
989         // new params, thus need a new operation
990         oper = new MyOper();
991
992         future.set(oper.start());
993         assertTrue(executor.runAll(MAX_REQUESTS));
994
995         // should not have been set
996         assertNull(opend);
997         assertEquals(0, numEnd);
998     }
999
1000     @Test
1001     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1002         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1003
1004         OperationOutcome outcome;
1005
1006         outcome = new OperationOutcome();
1007         oper.setOutcome(outcome, timex);
1008         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1009         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1010
1011         outcome = new OperationOutcome();
1012         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1013         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1014         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1015     }
1016
1017     @Test
1018     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1019         OperationOutcome outcome;
1020
1021         outcome = new OperationOutcome();
1022         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1023         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1024         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1025
1026         for (PolicyResult result : FAILURE_RESULTS) {
1027             outcome = new OperationOutcome();
1028             oper.setOutcome(outcome, result);
1029             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1030             assertEquals(result.toString(), result, outcome.getResult());
1031         }
1032     }
1033
1034     @Test
1035     public void testIsTimeout() {
1036         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1037
1038         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1039         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1040         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1041         assertFalse(oper.isTimeout(new CompletionException(null)));
1042         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1043
1044         assertTrue(oper.isTimeout(timex));
1045         assertTrue(oper.isTimeout(new CompletionException(timex)));
1046     }
1047
1048     @Test
1049     public void testLogMessage() {
1050         final String infraStr = SINK_INFRA.toString();
1051
1052         // log structured data
1053         appender.clearExtractions();
1054         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1055         List<String> output = appender.getExtracted();
1056         assertEquals(1, output.size());
1057
1058         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1059                         .contains("{\n  \"text\": \"my-text\"\n}");
1060
1061         // repeat with a response
1062         appender.clearExtractions();
1063         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1064         output = appender.getExtracted();
1065         assertEquals(1, output.size());
1066
1067         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1068                         .contains("{\n  \"text\": \"my-text\"\n}");
1069
1070         // log a plain string
1071         appender.clearExtractions();
1072         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1073         output = appender.getExtracted();
1074         assertEquals(1, output.size());
1075         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1076
1077         // log a null request
1078         appender.clearExtractions();
1079         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1080         output = appender.getExtracted();
1081         assertEquals(1, output.size());
1082
1083         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1084
1085         // generate exception from coder
1086         setOperCoderException();
1087
1088         appender.clearExtractions();
1089         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1090         output = appender.getExtracted();
1091         assertEquals(2, output.size());
1092         assertThat(output.get(0)).contains("cannot pretty-print request");
1093         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1094
1095         // repeat with a response
1096         appender.clearExtractions();
1097         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1098         output = appender.getExtracted();
1099         assertEquals(2, output.size());
1100         assertThat(output.get(0)).contains("cannot pretty-print response");
1101         assertThat(output.get(1)).contains(MY_SOURCE);
1102     }
1103
1104     @Test
1105     public void testGetRetry() {
1106         assertEquals(0, oper.getRetry(null));
1107         assertEquals(10, oper.getRetry(10));
1108     }
1109
1110     @Test
1111     public void testGetRetryWait() {
1112         // need an operator that doesn't override the retry time
1113         OperationPartial oper2 = new OperationPartial(params, config) {};
1114         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1115     }
1116
1117     @Test
1118     public void testGetTimeOutMs() {
1119         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1120
1121         params = params.toBuilder().timeoutSec(null).build();
1122
1123         // new params, thus need a new operation
1124         oper = new MyOper();
1125
1126         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1127     }
1128
1129     private void starter(OperationOutcome oper) {
1130         ++numStart;
1131         tstart = oper.getStart();
1132         opstart = oper;
1133     }
1134
1135     private void completer(OperationOutcome oper) {
1136         ++numEnd;
1137         opend = oper;
1138     }
1139
1140     /**
1141      * Gets a function that does nothing.
1142      *
1143      * @param <T> type of input parameter expected by the function
1144      * @return a function that does nothing
1145      */
1146     private <T> Consumer<T> noop() {
1147         return unused -> {
1148         };
1149     }
1150
1151     private OperationOutcome makeSuccess() {
1152         OperationOutcome outcome = params.makeOutcome();
1153         outcome.setResult(PolicyResult.SUCCESS);
1154
1155         return outcome;
1156     }
1157
1158     private OperationOutcome makeFailure() {
1159         OperationOutcome outcome = params.makeOutcome();
1160         outcome.setResult(PolicyResult.FAILURE);
1161
1162         return outcome;
1163     }
1164
1165     /**
1166      * Verifies a run.
1167      *
1168      * @param testName test name
1169      * @param expectedCallbacks number of callbacks expected
1170      * @param expectedOperations number of operation invocations expected
1171      * @param expectedResult expected outcome
1172      */
1173     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1174                     PolicyResult expectedResult) {
1175
1176         String expectedSubRequestId =
1177                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1178
1179         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1180     }
1181
1182     /**
1183      * Verifies a run.
1184      *
1185      * @param testName test name
1186      * @param expectedCallbacks number of callbacks expected
1187      * @param expectedOperations number of operation invocations expected
1188      * @param expectedResult expected outcome
1189      * @param expectedSubRequestId expected sub request ID
1190      * @param manipulator function to modify the future returned by
1191      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1192      *        in the executor are run
1193      */
1194     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1195                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1196
1197         CompletableFuture<OperationOutcome> future = oper.start();
1198
1199         manipulator.accept(future);
1200
1201         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1202
1203         assertEquals(testName, expectedCallbacks, numStart);
1204         assertEquals(testName, expectedCallbacks, numEnd);
1205
1206         if (expectedCallbacks > 0) {
1207             assertNotNull(testName, opstart);
1208             assertNotNull(testName, opend);
1209             assertEquals(testName, expectedResult, opend.getResult());
1210
1211             assertSame(testName, tstart, opstart.getStart());
1212             assertSame(testName, tstart, opend.getStart());
1213
1214             try {
1215                 assertTrue(future.isDone());
1216                 assertSame(testName, opend, future.get());
1217
1218             } catch (InterruptedException | ExecutionException e) {
1219                 throw new IllegalStateException(e);
1220             }
1221
1222             if (expectedOperations > 0) {
1223                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1224             }
1225         }
1226
1227         assertEquals(testName, expectedOperations, oper.getCount());
1228     }
1229
1230     /**
1231      * Creates a new {@link #oper} whose coder will throw an exception.
1232      */
1233     private void setOperCoderException() {
1234         oper = new MyOper() {
1235             @Override
1236             protected Coder makeCoder() {
1237                 return new StandardCoder() {
1238                     @Override
1239                     public String encode(Object object, boolean pretty) throws CoderException {
1240                         throw new CoderException(EXPECTED_EXCEPTION);
1241                     }
1242                 };
1243             }
1244         };
1245     }
1246
1247
1248     @Getter
1249     public static class MyData {
1250         private String text = TEXT;
1251     }
1252
1253
1254     private class MyOper extends OperationPartial {
1255         @Getter
1256         private int count = 0;
1257
1258         @Setter
1259         private boolean genException;
1260         @Setter
1261         private int maxFailures = 0;
1262         @Setter
1263         private CompletableFuture<OperationOutcome> preProc;
1264
1265
1266         public MyOper() {
1267             super(OperationPartialTest.this.params, config);
1268         }
1269
1270         @Override
1271         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1272             ++count;
1273             if (genException) {
1274                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1275             }
1276
1277             operation.setSubRequestId(String.valueOf(attempt));
1278
1279             if (count > maxFailures) {
1280                 operation.setResult(PolicyResult.SUCCESS);
1281             } else {
1282                 operation.setResult(PolicyResult.FAILURE);
1283             }
1284
1285             return operation;
1286         }
1287
1288         @Override
1289         protected long getRetryWaitMs() {
1290             /*
1291              * Sleep timers run in the background, but we want to control things via the
1292              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1293              */
1294             return 0L;
1295         }
1296
1297         @Override
1298         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1299             return (preProc != null ? preProc : super.startPreprocessorAsync());
1300         }
1301     }
1302 }