Sequence throws NPE if task outcome is null
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
56 import lombok.Getter;
57 import lombok.Setter;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.ArgumentCaptor;
63 import org.mockito.Mock;
64 import org.mockito.MockitoAnnotations;
65 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
66 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
67 import org.onap.policy.common.utils.coder.Coder;
68 import org.onap.policy.common.utils.coder.CoderException;
69 import org.onap.policy.common.utils.coder.StandardCoder;
70 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
71 import org.onap.policy.common.utils.time.PseudoExecutor;
72 import org.onap.policy.controlloop.ControlLoopOperation;
73 import org.onap.policy.controlloop.VirtualControlLoopEvent;
74 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
75 import org.onap.policy.controlloop.actorserviceprovider.Operation;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
81 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
82 import org.onap.policy.controlloop.policy.PolicyResult;
83 import org.slf4j.LoggerFactory;
84
85 public class OperationPartialTest {
86     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
87     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
88     private static final int MAX_REQUESTS = 100;
89     private static final int MAX_PARALLEL = 10;
90     private static final String EXPECTED_EXCEPTION = "expected exception";
91     private static final String ACTOR = "my-actor";
92     private static final String OPERATION = "my-operation";
93     private static final String MY_SINK = "my-sink";
94     private static final String MY_SOURCE = "my-source";
95     private static final String MY_TARGET_ENTITY = "my-entity";
96     private static final String TEXT = "my-text";
97     private static final int TIMEOUT = 1000;
98     private static final UUID REQ_ID = UUID.randomUUID();
99
100     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
101                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
102
103     /**
104      * Used to attach an appender to the class' logger.
105      */
106     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
107     private static final ExtractAppender appender = new ExtractAppender();
108
109     @Mock
110     private ActorService service;
111     @Mock
112     private Actor guardActor;
113     @Mock
114     private Operator guardOperator;
115     @Mock
116     private Operation guardOperation;
117
118     private VirtualControlLoopEvent event;
119     private ControlLoopEventContext context;
120     private PseudoExecutor executor;
121     private ControlLoopOperationParams params;
122
123     private MyOper oper;
124
125     private int numStart;
126     private int numEnd;
127
128     private Instant tstart;
129
130     private OperationOutcome opstart;
131     private OperationOutcome opend;
132
133     private Deque<OperationOutcome> starts;
134     private Deque<OperationOutcome> ends;
135
136     private OperatorConfig config;
137
138     /**
139      * Attaches the appender to the logger.
140      */
141     @BeforeClass
142     public static void setUpBeforeClass() throws Exception {
143         /**
144          * Attach appender to the logger.
145          */
146         appender.setContext(logger.getLoggerContext());
147         appender.start();
148
149         logger.addAppender(appender);
150     }
151
152     /**
153      * Stops the appender.
154      */
155     @AfterClass
156     public static void tearDownAfterClass() {
157         appender.stop();
158     }
159
160     /**
161      * Initializes the fields, including {@link #oper}.
162      */
163     @Before
164     public void setUp() {
165         MockitoAnnotations.initMocks(this);
166
167         event = new VirtualControlLoopEvent();
168         event.setRequestId(REQ_ID);
169
170         context = new ControlLoopEventContext(event);
171         executor = new PseudoExecutor();
172
173         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
174                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
175                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
176
177         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
178         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
179         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
180         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
181
182         config = new OperatorConfig(executor);
183
184         oper = new MyOper();
185
186         tstart = null;
187
188         opstart = null;
189         opend = null;
190
191         starts = new ArrayDeque<>(10);
192         ends = new ArrayDeque<>(10);
193     }
194
195     @Test
196     public void testOperatorPartial_testGetActorName_testGetName() {
197         assertEquals(ACTOR, oper.getActorName());
198         assertEquals(OPERATION, oper.getName());
199         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
200     }
201
202     @Test
203     public void testGetBlockingThread() throws Exception {
204         CompletableFuture<Void> future = new CompletableFuture<>();
205
206         // use the real executor
207         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
208             @Override
209             public Operation buildOperation(ControlLoopOperationParams params) {
210                 return null;
211             }
212         };
213
214         oper2.getBlockingExecutor().execute(() -> future.complete(null));
215
216         assertNull(future.get(5, TimeUnit.SECONDS));
217     }
218
219     @Test
220     public void testStart() {
221         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
222     }
223
224     /**
225      * Tests start() with multiple running requests.
226      */
227     @Test
228     public void testStartMultiple() {
229         for (int count = 0; count < MAX_PARALLEL; ++count) {
230             oper.start();
231         }
232
233         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
234
235         assertNotNull(opstart);
236         assertNotNull(opend);
237         assertEquals(PolicyResult.SUCCESS, opend.getResult());
238
239         assertEquals(MAX_PARALLEL, numStart);
240         assertEquals(MAX_PARALLEL, oper.getCount());
241         assertEquals(MAX_PARALLEL, numEnd);
242     }
243
244     /**
245      * Tests startPreprocessor() when the preprocessor returns a failure.
246      */
247     @Test
248     public void testStartPreprocessorFailure() {
249         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
250
251         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
252     }
253
254     /**
255      * Tests startPreprocessor() when the preprocessor throws an exception.
256      */
257     @Test
258     public void testStartPreprocessorException() {
259         // arrange for the preprocessor to throw an exception
260         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
261
262         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
263     }
264
265     /**
266      * Tests startPreprocessor() when the pipeline is not running.
267      */
268     @Test
269     public void testStartPreprocessorNotRunning() {
270         // arrange for the preprocessor to return success, which will be ignored
271         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
272
273         oper.start().cancel(false);
274         assertTrue(executor.runAll(MAX_REQUESTS));
275
276         assertNull(opstart);
277         assertNull(opend);
278
279         assertEquals(0, numStart);
280         assertEquals(0, oper.getCount());
281         assertEquals(0, numEnd);
282     }
283
284     /**
285      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
286      */
287     @Test
288     public void testStartPreprocessorBuilderException() {
289         oper = new MyOper() {
290             @Override
291             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
292                 throw new IllegalStateException(EXPECTED_EXCEPTION);
293             }
294         };
295
296         assertThatIllegalStateException().isThrownBy(() -> oper.start());
297
298         // should be nothing in the queue
299         assertEquals(0, executor.getQueueLength());
300     }
301
302     @Test
303     public void testStartPreprocessorAsync() {
304         assertNull(oper.startPreprocessorAsync());
305     }
306
307     @Test
308     public void testStartGuardAsync() throws Exception {
309         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
310         assertTrue(future.isDone());
311         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
312
313         // verify the parameters that were passed
314         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
315                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
316         verify(guardOperator).buildOperation(paramsCaptor.capture());
317
318         params = paramsCaptor.getValue();
319         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
320         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
321         assertNull(params.getRetry());
322         assertNull(params.getTimeoutSec());
323
324         Map<String, Object> payload = params.getPayload();
325         assertNotNull(payload);
326
327         @SuppressWarnings("unchecked")
328         Map<String, Object> resource = (Map<String, Object>) payload.get("resource");
329         assertNotNull(resource);
330
331         @SuppressWarnings("unchecked")
332         Map<String, Object> guard = (Map<String, Object>) resource.get("guard");
333         assertEquals(oper.makeGuardPayload(), guard);
334     }
335
336     @Test
337     public void testMakeGuardPayload() {
338         Map<String, Object> payload = oper.makeGuardPayload();
339         assertSame(REQ_ID, payload.get("requestId"));
340
341         // request id changes, so remove it
342         payload.remove("requestId");
343
344         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity}", payload.toString());
345
346         // repeat, but with closed loop name
347         event.setClosedLoopControlName("my-loop");
348         payload = oper.makeGuardPayload();
349         payload.remove("requestId");
350         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity, clname=my-loop}", payload.toString());
351     }
352
353     @Test
354     public void testStartOperationAsync() {
355         oper.start();
356         assertTrue(executor.runAll(MAX_REQUESTS));
357
358         assertEquals(1, oper.getCount());
359     }
360
361     @Test
362     public void testIsSuccess() {
363         assertFalse(oper.isSuccess(null));
364
365         OperationOutcome outcome = new OperationOutcome();
366
367         outcome.setResult(PolicyResult.SUCCESS);
368         assertTrue(oper.isSuccess(outcome));
369
370         for (PolicyResult failure : FAILURE_RESULTS) {
371             outcome.setResult(failure);
372             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
373         }
374     }
375
376     @Test
377     public void testIsActorFailed() {
378         assertFalse(oper.isActorFailed(null));
379
380         OperationOutcome outcome = params.makeOutcome();
381
382         // incorrect outcome
383         outcome.setResult(PolicyResult.SUCCESS);
384         assertFalse(oper.isActorFailed(outcome));
385
386         outcome.setResult(PolicyResult.FAILURE_RETRIES);
387         assertFalse(oper.isActorFailed(outcome));
388
389         // correct outcome
390         outcome.setResult(PolicyResult.FAILURE);
391
392         // incorrect actor
393         outcome.setActor(MY_SINK);
394         assertFalse(oper.isActorFailed(outcome));
395         outcome.setActor(null);
396         assertFalse(oper.isActorFailed(outcome));
397         outcome.setActor(ACTOR);
398
399         // incorrect operation
400         outcome.setOperation(MY_SINK);
401         assertFalse(oper.isActorFailed(outcome));
402         outcome.setOperation(null);
403         assertFalse(oper.isActorFailed(outcome));
404         outcome.setOperation(OPERATION);
405
406         // correct values
407         assertTrue(oper.isActorFailed(outcome));
408     }
409
410     @Test
411     public void testDoOperation() {
412         /*
413          * Use an operation that doesn't override doOperation().
414          */
415         OperationPartial oper2 = new OperationPartial(params, config) {};
416
417         oper2.start();
418         assertTrue(executor.runAll(MAX_REQUESTS));
419
420         assertNotNull(opend);
421         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
422     }
423
424     @Test
425     public void testTimeout() throws Exception {
426
427         // use a real executor
428         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
429
430         // trigger timeout very quickly
431         oper = new MyOper() {
432             @Override
433             protected long getTimeoutMs(Integer timeoutSec) {
434                 return 1;
435             }
436
437             @Override
438             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
439
440                 OperationOutcome outcome2 = params.makeOutcome();
441                 outcome2.setResult(PolicyResult.SUCCESS);
442
443                 /*
444                  * Create an incomplete future that will timeout after the operation's
445                  * timeout. If it fires before the other timer, then it will return a
446                  * SUCCESS outcome.
447                  */
448                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
449                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
450                                 params.getExecutor());
451
452                 return future;
453             }
454         };
455
456         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
457     }
458
459     /**
460      * Tests retry functions, when the count is set to zero and retries are exhausted.
461      */
462     @Test
463     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
464         params = params.toBuilder().retry(0).build();
465
466         // new params, thus need a new operation
467         oper = new MyOper();
468
469         oper.setMaxFailures(10);
470
471         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
472     }
473
474     /**
475      * Tests retry functions, when the count is null and retries are exhausted.
476      */
477     @Test
478     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
479         params = params.toBuilder().retry(null).build();
480
481         // new params, thus need a new operation
482         oper = new MyOper();
483
484         oper.setMaxFailures(10);
485
486         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
487     }
488
489     /**
490      * Tests retry functions, when retries are exhausted.
491      */
492     @Test
493     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
494         final int maxRetries = 3;
495         params = params.toBuilder().retry(maxRetries).build();
496
497         // new params, thus need a new operation
498         oper = new MyOper();
499
500         oper.setMaxFailures(10);
501
502         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
503                         PolicyResult.FAILURE_RETRIES);
504     }
505
506     /**
507      * Tests retry functions, when a success follows some retries.
508      */
509     @Test
510     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
511         params = params.toBuilder().retry(10).build();
512
513         // new params, thus need a new operation
514         oper = new MyOper();
515
516         final int maxFailures = 3;
517         oper.setMaxFailures(maxFailures);
518
519         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
520                         PolicyResult.SUCCESS);
521     }
522
523     /**
524      * Tests retry functions, when the outcome is {@code null}.
525      */
526     @Test
527     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
528
529         // arrange to return null from doOperation()
530         oper = new MyOper() {
531             @Override
532             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
533
534                 // update counters
535                 super.doOperation(attempt, outcome);
536                 return null;
537             }
538         };
539
540         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
541     }
542
543     @Test
544     public void testSleep() throws Exception {
545         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
546         assertTrue(future.isDone());
547         assertNull(future.get());
548
549         // edge case
550         future = oper.sleep(0, TimeUnit.SECONDS);
551         assertTrue(future.isDone());
552         assertNull(future.get());
553
554         /*
555          * Start a second sleep we can use to check the first while it's running.
556          */
557         tstart = Instant.now();
558         future = oper.sleep(100, TimeUnit.MILLISECONDS);
559
560         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
561
562         // wait for second to complete and verify that the first has not completed
563         future2.get();
564         assertFalse(future.isDone());
565
566         // wait for second to complete
567         future.get();
568
569         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
570         assertTrue(diff >= 99);
571     }
572
573     @Test
574     public void testIsSameOperation() {
575         assertFalse(oper.isSameOperation(null));
576
577         OperationOutcome outcome = params.makeOutcome();
578
579         // wrong actor - should be false
580         outcome.setActor(null);
581         assertFalse(oper.isSameOperation(outcome));
582         outcome.setActor(MY_SINK);
583         assertFalse(oper.isSameOperation(outcome));
584         outcome.setActor(ACTOR);
585
586         // wrong operation - should be null
587         outcome.setOperation(null);
588         assertFalse(oper.isSameOperation(outcome));
589         outcome.setOperation(MY_SINK);
590         assertFalse(oper.isSameOperation(outcome));
591         outcome.setOperation(OPERATION);
592
593         assertTrue(oper.isSameOperation(outcome));
594     }
595
596     /**
597      * Tests handleFailure() when the outcome is a success.
598      */
599     @Test
600     public void testHandlePreprocessorFailureSuccess() {
601         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
602         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
603     }
604
605     /**
606      * Tests handleFailure() when the outcome is <i>not</i> a success.
607      */
608     @Test
609     public void testHandlePreprocessorFailureFailed() throws Exception {
610         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
611         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
612     }
613
614     /**
615      * Tests handleFailure() when the outcome is {@code null}.
616      */
617     @Test
618     public void testHandlePreprocessorFailureNull() throws Exception {
619         // arrange to return a null outcome from the preprocessor
620         oper.setPreProc(CompletableFuture.completedFuture(null));
621         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
622     }
623
624     @Test
625     public void testFromException() {
626         // arrange to generate an exception when operation runs
627         oper.setGenException(true);
628
629         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
630     }
631
632     /**
633      * Tests fromException() when there is no exception.
634      */
635     @Test
636     public void testFromExceptionNoExcept() {
637         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
638     }
639
640     /**
641      * Tests both flavors of anyOf(), because one invokes the other.
642      */
643     @Test
644     public void testAnyOf() throws Exception {
645         // first task completes, others do not
646         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
647
648         final OperationOutcome outcome = params.makeOutcome();
649
650         tasks.add(() -> CompletableFuture.completedFuture(outcome));
651         tasks.add(() -> new CompletableFuture<>());
652         tasks.add(() -> null);
653         tasks.add(() -> new CompletableFuture<>());
654
655         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
656         assertTrue(executor.runAll(MAX_REQUESTS));
657         assertTrue(result.isDone());
658         assertSame(outcome, result.get());
659
660         // repeat using array form
661         @SuppressWarnings("unchecked")
662         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
663         result = oper.anyOf(tasks.toArray(taskArray));
664         assertTrue(executor.runAll(MAX_REQUESTS));
665         assertTrue(result.isDone());
666         assertSame(outcome, result.get());
667
668         // second task completes, others do not
669         tasks.clear();
670         tasks.add(() -> new CompletableFuture<>());
671         tasks.add(() -> CompletableFuture.completedFuture(outcome));
672         tasks.add(() -> new CompletableFuture<>());
673
674         result = oper.anyOf(tasks);
675         assertTrue(executor.runAll(MAX_REQUESTS));
676         assertTrue(result.isDone());
677         assertSame(outcome, result.get());
678
679         // third task completes, others do not
680         tasks.clear();
681         tasks.add(() -> new CompletableFuture<>());
682         tasks.add(() -> new CompletableFuture<>());
683         tasks.add(() -> CompletableFuture.completedFuture(outcome));
684
685         result = oper.anyOf(tasks);
686         assertTrue(executor.runAll(MAX_REQUESTS));
687         assertTrue(result.isDone());
688         assertSame(outcome, result.get());
689     }
690
691     /**
692      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
693      */
694     @Test
695     @SuppressWarnings("unchecked")
696     public void testAnyOfEdge() throws Exception {
697         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
698
699         // zero items: check both using a list and using an array
700         assertNull(oper.anyOf(tasks));
701         assertNull(oper.anyOf());
702
703         // one item: : check both using a list and using an array
704         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
705         tasks.add(() -> future1);
706
707         assertSame(future1, oper.anyOf(tasks));
708         assertSame(future1, oper.anyOf(() -> future1));
709     }
710
711     @Test
712     public void testAllOfArray() throws Exception {
713         final OperationOutcome outcome = params.makeOutcome();
714
715         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
716         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
717         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
718
719         @SuppressWarnings("unchecked")
720         CompletableFuture<OperationOutcome> result =
721                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
722
723         assertTrue(executor.runAll(MAX_REQUESTS));
724         assertFalse(result.isDone());
725         future1.complete(outcome);
726
727         // complete 3 before 2
728         assertTrue(executor.runAll(MAX_REQUESTS));
729         assertFalse(result.isDone());
730         future3.complete(outcome);
731
732         assertTrue(executor.runAll(MAX_REQUESTS));
733         assertFalse(result.isDone());
734         future2.complete(outcome);
735
736         // all of them are now done
737         assertTrue(executor.runAll(MAX_REQUESTS));
738         assertTrue(result.isDone());
739         assertSame(outcome, result.get());
740     }
741
742     @Test
743     public void testAllOfList() throws Exception {
744         final OperationOutcome outcome = params.makeOutcome();
745
746         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
747         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
748         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
749
750         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
751         tasks.add(() -> future1);
752         tasks.add(() -> future2);
753         tasks.add(() -> null);
754         tasks.add(() -> future3);
755
756         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
757
758         assertTrue(executor.runAll(MAX_REQUESTS));
759         assertFalse(result.isDone());
760         future1.complete(outcome);
761
762         // complete 3 before 2
763         assertTrue(executor.runAll(MAX_REQUESTS));
764         assertFalse(result.isDone());
765         future3.complete(outcome);
766
767         assertTrue(executor.runAll(MAX_REQUESTS));
768         assertFalse(result.isDone());
769         future2.complete(outcome);
770
771         // all of them are now done
772         assertTrue(executor.runAll(MAX_REQUESTS));
773         assertTrue(result.isDone());
774         assertSame(outcome, result.get());
775     }
776
777     /**
778      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
779      */
780     @Test
781     @SuppressWarnings("unchecked")
782     public void testAllOfEdge() throws Exception {
783         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
784
785         // zero items: check both using a list and using an array
786         assertNull(oper.allOf(tasks));
787         assertNull(oper.allOf());
788
789         // one item: : check both using a list and using an array
790         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
791         tasks.add(() -> future1);
792
793         assertSame(future1, oper.allOf(tasks));
794         assertSame(future1, oper.allOf(() -> future1));
795     }
796
797     @Test
798     public void testAttachFutures() throws Exception {
799         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
800
801         // third task throws an exception during construction
802         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
803         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
804         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
805         tasks.add(() -> future1);
806         tasks.add(() -> future2);
807         tasks.add(() -> {
808             throw new IllegalStateException(EXPECTED_EXCEPTION);
809         });
810         tasks.add(() -> future3);
811
812         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
813
814         // should have canceled the first two, but not the last
815         assertTrue(future1.isCancelled());
816         assertTrue(future2.isCancelled());
817         assertFalse(future3.isCancelled());
818     }
819
820     @Test
821     public void testCombineOutcomes() throws Exception {
822         // only one outcome
823         verifyOutcomes(0, PolicyResult.SUCCESS);
824         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
825
826         // maximum is in different positions
827         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
828         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
829         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
830
831         // null outcome - takes precedence over a success
832         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
833         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
834         tasks.add(() -> CompletableFuture.completedFuture(null));
835         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
836         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
837
838         assertTrue(executor.runAll(MAX_REQUESTS));
839         assertTrue(result.isDone());
840         assertNull(result.get());
841
842         // one throws an exception during execution
843         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
844
845         tasks.clear();
846         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
847         tasks.add(() -> CompletableFuture.failedFuture(except));
848         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
849         result = oper.allOf(tasks);
850
851         assertTrue(executor.runAll(MAX_REQUESTS));
852         assertTrue(result.isCompletedExceptionally());
853         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
854     }
855
856     /**
857      * Tests both flavors of sequence(), because one invokes the other.
858      */
859     @Test
860     public void testSequence() throws Exception {
861         final OperationOutcome outcome = params.makeOutcome();
862
863         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
864         tasks.add(() -> CompletableFuture.completedFuture(outcome));
865         tasks.add(() -> null);
866         tasks.add(() -> CompletableFuture.completedFuture(outcome));
867         tasks.add(() -> CompletableFuture.completedFuture(outcome));
868
869         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
870         assertTrue(executor.runAll(MAX_REQUESTS));
871         assertTrue(result.isDone());
872         assertSame(outcome, result.get());
873
874         // repeat using array form
875         @SuppressWarnings("unchecked")
876         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
877         result = oper.sequence(tasks.toArray(taskArray));
878         assertTrue(executor.runAll(MAX_REQUESTS));
879         assertTrue(result.isDone());
880         assertSame(outcome, result.get());
881
882         // second task fails, third should not run
883         OperationOutcome failure = params.makeOutcome();
884         failure.setResult(PolicyResult.FAILURE);
885         tasks.clear();
886         tasks.add(() -> CompletableFuture.completedFuture(outcome));
887         tasks.add(() -> CompletableFuture.completedFuture(failure));
888         tasks.add(() -> CompletableFuture.completedFuture(outcome));
889
890         result = oper.sequence(tasks);
891         assertTrue(executor.runAll(MAX_REQUESTS));
892         assertTrue(result.isDone());
893         assertSame(failure, result.get());
894     }
895
896     /**
897      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
898      */
899     @Test
900     @SuppressWarnings("unchecked")
901     public void testSequenceEdge() throws Exception {
902         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
903
904         // zero items: check both using a list and using an array
905         assertNull(oper.sequence(tasks));
906         assertNull(oper.sequence());
907
908         // one item: : check both using a list and using an array
909         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
910         tasks.add(() -> future1);
911
912         assertSame(future1, oper.sequence(tasks));
913         assertSame(future1, oper.sequence(() -> future1));
914     }
915
916     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
917         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
918
919         OperationOutcome expectedOutcome = null;
920
921         for (int count = 0; count < results.length; ++count) {
922             OperationOutcome outcome = params.makeOutcome();
923             outcome.setResult(results[count]);
924             tasks.add(() -> CompletableFuture.completedFuture(outcome));
925
926             if (count == expected) {
927                 expectedOutcome = outcome;
928             }
929         }
930
931         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
932
933         assertTrue(executor.runAll(MAX_REQUESTS));
934         assertTrue(result.isDone());
935         assertSame(expectedOutcome, result.get());
936     }
937
938     @Test
939     public void testDetmPriority() throws CoderException {
940         assertEquals(1, oper.detmPriority(null));
941
942         OperationOutcome outcome = params.makeOutcome();
943
944         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
945                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
946                         PolicyResult.FAILURE_EXCEPTION, 6);
947
948         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
949             outcome.setResult(ent.getKey());
950             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
951         }
952
953         /*
954          * Test null result. We can't actually set it to null, because the set() method
955          * won't allow it. Instead, we decode it from a structure.
956          */
957         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
958         assertEquals(1, oper.detmPriority(outcome));
959     }
960
961     /**
962      * Tests callbackStarted() when the pipeline has already been stopped.
963      */
964     @Test
965     public void testCallbackStartedNotRunning() {
966         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
967
968         /*
969          * arrange to stop the controller when the start-callback is invoked, but capture
970          * the outcome
971          */
972         params = params.toBuilder().startCallback(oper -> {
973             starter(oper);
974             future.get().cancel(false);
975         }).build();
976
977         // new params, thus need a new operation
978         oper = new MyOper();
979
980         future.set(oper.start());
981         assertTrue(executor.runAll(MAX_REQUESTS));
982
983         // should have only run once
984         assertEquals(1, numStart);
985     }
986
987     /**
988      * Tests callbackCompleted() when the pipeline has already been stopped.
989      */
990     @Test
991     public void testCallbackCompletedNotRunning() {
992         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
993
994         // arrange to stop the controller when the start-callback is invoked
995         params = params.toBuilder().startCallback(oper -> {
996             future.get().cancel(false);
997         }).build();
998
999         // new params, thus need a new operation
1000         oper = new MyOper();
1001
1002         future.set(oper.start());
1003         assertTrue(executor.runAll(MAX_REQUESTS));
1004
1005         // should not have been set
1006         assertNull(opend);
1007         assertEquals(0, numEnd);
1008     }
1009
1010     @Test
1011     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1012         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1013
1014         OperationOutcome outcome;
1015
1016         outcome = new OperationOutcome();
1017         oper.setOutcome(outcome, timex);
1018         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1019         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1020
1021         outcome = new OperationOutcome();
1022         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1023         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1024         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1025     }
1026
1027     @Test
1028     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1029         OperationOutcome outcome;
1030
1031         outcome = new OperationOutcome();
1032         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1033         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1034         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1035
1036         for (PolicyResult result : FAILURE_RESULTS) {
1037             outcome = new OperationOutcome();
1038             oper.setOutcome(outcome, result);
1039             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1040             assertEquals(result.toString(), result, outcome.getResult());
1041         }
1042     }
1043
1044     @Test
1045     public void testIsTimeout() {
1046         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1047
1048         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1049         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1050         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1051         assertFalse(oper.isTimeout(new CompletionException(null)));
1052         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1053
1054         assertTrue(oper.isTimeout(timex));
1055         assertTrue(oper.isTimeout(new CompletionException(timex)));
1056     }
1057
1058     @Test
1059     public void testLogMessage() {
1060         final String infraStr = SINK_INFRA.toString();
1061
1062         // log structured data
1063         appender.clearExtractions();
1064         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1065         List<String> output = appender.getExtracted();
1066         assertEquals(1, output.size());
1067
1068         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1069                         .contains("{\n  \"text\": \"my-text\"\n}");
1070
1071         // repeat with a response
1072         appender.clearExtractions();
1073         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1074         output = appender.getExtracted();
1075         assertEquals(1, output.size());
1076
1077         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1078                         .contains("{\n  \"text\": \"my-text\"\n}");
1079
1080         // log a plain string
1081         appender.clearExtractions();
1082         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1083         output = appender.getExtracted();
1084         assertEquals(1, output.size());
1085         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1086
1087         // log a null request
1088         appender.clearExtractions();
1089         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1090         output = appender.getExtracted();
1091         assertEquals(1, output.size());
1092
1093         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1094
1095         // generate exception from coder
1096         setOperCoderException();
1097
1098         appender.clearExtractions();
1099         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1100         output = appender.getExtracted();
1101         assertEquals(2, output.size());
1102         assertThat(output.get(0)).contains("cannot pretty-print request");
1103         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1104
1105         // repeat with a response
1106         appender.clearExtractions();
1107         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1108         output = appender.getExtracted();
1109         assertEquals(2, output.size());
1110         assertThat(output.get(0)).contains("cannot pretty-print response");
1111         assertThat(output.get(1)).contains(MY_SOURCE);
1112     }
1113
1114     @Test
1115     public void testGetRetry() {
1116         assertEquals(0, oper.getRetry(null));
1117         assertEquals(10, oper.getRetry(10));
1118     }
1119
1120     @Test
1121     public void testGetRetryWait() {
1122         // need an operator that doesn't override the retry time
1123         OperationPartial oper2 = new OperationPartial(params, config) {};
1124         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1125     }
1126
1127     @Test
1128     public void testGetTimeOutMs() {
1129         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1130
1131         params = params.toBuilder().timeoutSec(null).build();
1132
1133         // new params, thus need a new operation
1134         oper = new MyOper();
1135
1136         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1137     }
1138
1139     private void starter(OperationOutcome oper) {
1140         ++numStart;
1141         tstart = oper.getStart();
1142         opstart = oper;
1143         starts.add(oper);
1144     }
1145
1146     private void completer(OperationOutcome oper) {
1147         ++numEnd;
1148         opend = oper;
1149         ends.add(oper);
1150     }
1151
1152     /**
1153      * Gets a function that does nothing.
1154      *
1155      * @param <T> type of input parameter expected by the function
1156      * @return a function that does nothing
1157      */
1158     private <T> Consumer<T> noop() {
1159         return unused -> {
1160         };
1161     }
1162
1163     private OperationOutcome makeSuccess() {
1164         OperationOutcome outcome = params.makeOutcome();
1165         outcome.setResult(PolicyResult.SUCCESS);
1166
1167         return outcome;
1168     }
1169
1170     private OperationOutcome makeFailure() {
1171         OperationOutcome outcome = params.makeOutcome();
1172         outcome.setResult(PolicyResult.FAILURE);
1173
1174         return outcome;
1175     }
1176
1177     /**
1178      * Verifies a run.
1179      *
1180      * @param testName test name
1181      * @param expectedCallbacks number of callbacks expected
1182      * @param expectedOperations number of operation invocations expected
1183      * @param expectedResult expected outcome
1184      */
1185     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1186                     PolicyResult expectedResult) {
1187
1188         String expectedSubRequestId =
1189                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1190
1191         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1192     }
1193
1194     /**
1195      * Verifies a run.
1196      *
1197      * @param testName test name
1198      * @param expectedCallbacks number of callbacks expected
1199      * @param expectedOperations number of operation invocations expected
1200      * @param expectedResult expected outcome
1201      * @param expectedSubRequestId expected sub request ID
1202      * @param manipulator function to modify the future returned by
1203      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1204      *        in the executor are run
1205      */
1206     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1207                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1208
1209         tstart = null;
1210         opstart = null;
1211         opend = null;
1212         starts.clear();
1213         ends.clear();
1214
1215         CompletableFuture<OperationOutcome> future = oper.start();
1216
1217         manipulator.accept(future);
1218
1219         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1220
1221         assertEquals(testName, expectedCallbacks, numStart);
1222         assertEquals(testName, expectedCallbacks, numEnd);
1223
1224         if (expectedCallbacks > 0) {
1225             assertNotNull(testName, opstart);
1226             assertNotNull(testName, opend);
1227             assertEquals(testName, expectedResult, opend.getResult());
1228
1229             assertSame(testName, tstart, opstart.getStart());
1230             assertSame(testName, tstart, opend.getStart());
1231
1232             try {
1233                 assertTrue(future.isDone());
1234                 assertEquals(testName, opend, future.get());
1235
1236                 // "start" is never final
1237                 for (OperationOutcome outcome : starts) {
1238                     assertFalse(testName, outcome.isFinalOutcome());
1239                 }
1240
1241                 // only the last "complete" is final
1242                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1243
1244                 for (OperationOutcome outcome : ends) {
1245                     assertFalse(outcome.isFinalOutcome());
1246                 }
1247
1248             } catch (InterruptedException | ExecutionException e) {
1249                 throw new IllegalStateException(e);
1250             }
1251
1252             if (expectedOperations > 0) {
1253                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1254             }
1255         }
1256
1257         assertEquals(testName, expectedOperations, oper.getCount());
1258     }
1259
1260     /**
1261      * Creates a new {@link #oper} whose coder will throw an exception.
1262      */
1263     private void setOperCoderException() {
1264         oper = new MyOper() {
1265             @Override
1266             protected Coder makeCoder() {
1267                 return new StandardCoder() {
1268                     @Override
1269                     public String encode(Object object, boolean pretty) throws CoderException {
1270                         throw new CoderException(EXPECTED_EXCEPTION);
1271                     }
1272                 };
1273             }
1274         };
1275     }
1276
1277
1278     @Getter
1279     public static class MyData {
1280         private String text = TEXT;
1281     }
1282
1283
1284     private class MyOper extends OperationPartial {
1285         @Getter
1286         private int count = 0;
1287
1288         @Setter
1289         private boolean genException;
1290         @Setter
1291         private int maxFailures = 0;
1292         @Setter
1293         private CompletableFuture<OperationOutcome> preProc;
1294
1295
1296         public MyOper() {
1297             super(OperationPartialTest.this.params, config);
1298         }
1299
1300         @Override
1301         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1302             ++count;
1303             if (genException) {
1304                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1305             }
1306
1307             operation.setSubRequestId(String.valueOf(attempt));
1308
1309             if (count > maxFailures) {
1310                 operation.setResult(PolicyResult.SUCCESS);
1311             } else {
1312                 operation.setResult(PolicyResult.FAILURE);
1313             }
1314
1315             return operation;
1316         }
1317
1318         @Override
1319         protected long getRetryWaitMs() {
1320             /*
1321              * Sleep timers run in the background, but we want to control things via the
1322              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1323              */
1324             return 0L;
1325         }
1326
1327         @Override
1328         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1329             return (preProc != null ? preProc : super.startPreprocessorAsync());
1330         }
1331     }
1332 }