229e5a32a3359b628d24e10498bbbc288a91fd0f
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
56 import lombok.Getter;
57 import lombok.Setter;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.ArgumentCaptor;
63 import org.mockito.Mock;
64 import org.mockito.MockitoAnnotations;
65 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
66 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
67 import org.onap.policy.common.utils.coder.Coder;
68 import org.onap.policy.common.utils.coder.CoderException;
69 import org.onap.policy.common.utils.coder.StandardCoder;
70 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
71 import org.onap.policy.common.utils.time.PseudoExecutor;
72 import org.onap.policy.controlloop.ControlLoopOperation;
73 import org.onap.policy.controlloop.VirtualControlLoopEvent;
74 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
75 import org.onap.policy.controlloop.actorserviceprovider.Operation;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
81 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
82 import org.onap.policy.controlloop.policy.PolicyResult;
83 import org.slf4j.LoggerFactory;
84
85 public class OperationPartialTest {
86     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
87     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
88     private static final int MAX_REQUESTS = 100;
89     private static final int MAX_PARALLEL = 10;
90     private static final String EXPECTED_EXCEPTION = "expected exception";
91     private static final String ACTOR = "my-actor";
92     private static final String OPERATION = "my-operation";
93     private static final String MY_SINK = "my-sink";
94     private static final String MY_SOURCE = "my-source";
95     private static final String MY_TARGET_ENTITY = "my-entity";
96     private static final String TEXT = "my-text";
97     private static final int TIMEOUT = 1000;
98     private static final UUID REQ_ID = UUID.randomUUID();
99
100     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
101                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
102
103     /**
104      * Used to attach an appender to the class' logger.
105      */
106     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
107     private static final ExtractAppender appender = new ExtractAppender();
108
109     @Mock
110     private ActorService service;
111     @Mock
112     private Actor guardActor;
113     @Mock
114     private Operator guardOperator;
115     @Mock
116     private Operation guardOperation;
117
118     private VirtualControlLoopEvent event;
119     private ControlLoopEventContext context;
120     private PseudoExecutor executor;
121     private ControlLoopOperationParams params;
122
123     private MyOper oper;
124
125     private int numStart;
126     private int numEnd;
127
128     private Instant tstart;
129
130     private OperationOutcome opstart;
131     private OperationOutcome opend;
132
133     private Deque<OperationOutcome> starts;
134     private Deque<OperationOutcome> ends;
135
136     private OperatorConfig config;
137
138     /**
139      * Attaches the appender to the logger.
140      */
141     @BeforeClass
142     public static void setUpBeforeClass() throws Exception {
143         /**
144          * Attach appender to the logger.
145          */
146         appender.setContext(logger.getLoggerContext());
147         appender.start();
148
149         logger.addAppender(appender);
150     }
151
152     /**
153      * Stops the appender.
154      */
155     @AfterClass
156     public static void tearDownAfterClass() {
157         appender.stop();
158     }
159
160     /**
161      * Initializes the fields, including {@link #oper}.
162      */
163     @Before
164     public void setUp() {
165         MockitoAnnotations.initMocks(this);
166
167         event = new VirtualControlLoopEvent();
168         event.setRequestId(REQ_ID);
169
170         context = new ControlLoopEventContext(event);
171         executor = new PseudoExecutor();
172
173         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
174                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
175                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
176
177         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
178         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
179         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
180         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
181
182         config = new OperatorConfig(executor);
183
184         oper = new MyOper();
185
186         tstart = null;
187
188         opstart = null;
189         opend = null;
190
191         starts = new ArrayDeque<>(10);
192         ends = new ArrayDeque<>(10);
193     }
194
195     @Test
196     public void testOperatorPartial_testGetActorName_testGetName() {
197         assertEquals(ACTOR, oper.getActorName());
198         assertEquals(OPERATION, oper.getName());
199         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
200     }
201
202     @Test
203     public void testGetBlockingThread() throws Exception {
204         CompletableFuture<Void> future = new CompletableFuture<>();
205
206         // use the real executor
207         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
208             @Override
209             public Operation buildOperation(ControlLoopOperationParams params) {
210                 return null;
211             }
212         };
213
214         oper2.getBlockingExecutor().execute(() -> future.complete(null));
215
216         assertNull(future.get(5, TimeUnit.SECONDS));
217     }
218
219     @Test
220     public void testStart() {
221         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
222     }
223
224     /**
225      * Tests start() with multiple running requests.
226      */
227     @Test
228     public void testStartMultiple() {
229         for (int count = 0; count < MAX_PARALLEL; ++count) {
230             oper.start();
231         }
232
233         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
234
235         assertNotNull(opstart);
236         assertNotNull(opend);
237         assertEquals(PolicyResult.SUCCESS, opend.getResult());
238
239         assertEquals(MAX_PARALLEL, numStart);
240         assertEquals(MAX_PARALLEL, oper.getCount());
241         assertEquals(MAX_PARALLEL, numEnd);
242     }
243
244     /**
245      * Tests startPreprocessor() when the preprocessor returns a failure.
246      */
247     @Test
248     public void testStartPreprocessorFailure() {
249         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
250
251         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
252     }
253
254     /**
255      * Tests startPreprocessor() when the preprocessor throws an exception.
256      */
257     @Test
258     public void testStartPreprocessorException() {
259         // arrange for the preprocessor to throw an exception
260         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
261
262         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
263     }
264
265     /**
266      * Tests startPreprocessor() when the pipeline is not running.
267      */
268     @Test
269     public void testStartPreprocessorNotRunning() {
270         // arrange for the preprocessor to return success, which will be ignored
271         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
272
273         oper.start().cancel(false);
274         assertTrue(executor.runAll(MAX_REQUESTS));
275
276         assertNull(opstart);
277         assertNull(opend);
278
279         assertEquals(0, numStart);
280         assertEquals(0, oper.getCount());
281         assertEquals(0, numEnd);
282     }
283
284     /**
285      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
286      */
287     @Test
288     public void testStartPreprocessorBuilderException() {
289         oper = new MyOper() {
290             @Override
291             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
292                 throw new IllegalStateException(EXPECTED_EXCEPTION);
293             }
294         };
295
296         assertThatIllegalStateException().isThrownBy(() -> oper.start());
297
298         // should be nothing in the queue
299         assertEquals(0, executor.getQueueLength());
300     }
301
302     @Test
303     public void testStartPreprocessorAsync() {
304         assertNull(oper.startPreprocessorAsync());
305     }
306
307     @Test
308     public void testStartGuardAsync() throws Exception {
309         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
310         assertTrue(future.isDone());
311         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
312
313         // verify the parameters that were passed
314         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
315                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
316         verify(guardOperator).buildOperation(paramsCaptor.capture());
317
318         params = paramsCaptor.getValue();
319         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
320         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
321         assertNull(params.getRetry());
322         assertNull(params.getTimeoutSec());
323
324         Map<String, Object> payload = params.getPayload();
325         assertNotNull(payload);
326
327         @SuppressWarnings("unchecked")
328         Map<String, Object> resource = (Map<String, Object>) payload.get("resource");
329         assertNotNull(resource);
330
331         @SuppressWarnings("unchecked")
332         Map<String, Object> guard = (Map<String, Object>) resource.get("guard");
333         assertEquals(oper.makeGuardPayload(), guard);
334     }
335
336     @Test
337     public void testMakeGuardPayload() {
338         Map<String, Object> payload = oper.makeGuardPayload();
339         assertSame(REQ_ID, payload.get("requestId"));
340
341         // request id changes, so remove it
342         payload.remove("requestId");
343
344         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity}", payload.toString());
345
346         // repeat, but with closed loop name
347         event.setClosedLoopControlName("my-loop");
348         payload = oper.makeGuardPayload();
349         payload.remove("requestId");
350         assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity, clname=my-loop}", payload.toString());
351     }
352
353     @Test
354     public void testStartOperationAsync() {
355         oper.start();
356         assertTrue(executor.runAll(MAX_REQUESTS));
357
358         assertEquals(1, oper.getCount());
359     }
360
361     @Test
362     public void testIsSuccess() {
363         OperationOutcome outcome = new OperationOutcome();
364
365         outcome.setResult(PolicyResult.SUCCESS);
366         assertTrue(oper.isSuccess(outcome));
367
368         for (PolicyResult failure : FAILURE_RESULTS) {
369             outcome.setResult(failure);
370             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
371         }
372     }
373
374     @Test
375     public void testIsActorFailed() {
376         assertFalse(oper.isActorFailed(null));
377
378         OperationOutcome outcome = params.makeOutcome();
379
380         // incorrect outcome
381         outcome.setResult(PolicyResult.SUCCESS);
382         assertFalse(oper.isActorFailed(outcome));
383
384         outcome.setResult(PolicyResult.FAILURE_RETRIES);
385         assertFalse(oper.isActorFailed(outcome));
386
387         // correct outcome
388         outcome.setResult(PolicyResult.FAILURE);
389
390         // incorrect actor
391         outcome.setActor(MY_SINK);
392         assertFalse(oper.isActorFailed(outcome));
393         outcome.setActor(null);
394         assertFalse(oper.isActorFailed(outcome));
395         outcome.setActor(ACTOR);
396
397         // incorrect operation
398         outcome.setOperation(MY_SINK);
399         assertFalse(oper.isActorFailed(outcome));
400         outcome.setOperation(null);
401         assertFalse(oper.isActorFailed(outcome));
402         outcome.setOperation(OPERATION);
403
404         // correct values
405         assertTrue(oper.isActorFailed(outcome));
406     }
407
408     @Test
409     public void testDoOperation() {
410         /*
411          * Use an operation that doesn't override doOperation().
412          */
413         OperationPartial oper2 = new OperationPartial(params, config) {};
414
415         oper2.start();
416         assertTrue(executor.runAll(MAX_REQUESTS));
417
418         assertNotNull(opend);
419         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
420     }
421
422     @Test
423     public void testTimeout() throws Exception {
424
425         // use a real executor
426         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
427
428         // trigger timeout very quickly
429         oper = new MyOper() {
430             @Override
431             protected long getTimeoutMs(Integer timeoutSec) {
432                 return 1;
433             }
434
435             @Override
436             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
437
438                 OperationOutcome outcome2 = params.makeOutcome();
439                 outcome2.setResult(PolicyResult.SUCCESS);
440
441                 /*
442                  * Create an incomplete future that will timeout after the operation's
443                  * timeout. If it fires before the other timer, then it will return a
444                  * SUCCESS outcome.
445                  */
446                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
447                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
448                                 params.getExecutor());
449
450                 return future;
451             }
452         };
453
454         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
455     }
456
457     /**
458      * Tests retry functions, when the count is set to zero and retries are exhausted.
459      */
460     @Test
461     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
462         params = params.toBuilder().retry(0).build();
463
464         // new params, thus need a new operation
465         oper = new MyOper();
466
467         oper.setMaxFailures(10);
468
469         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
470     }
471
472     /**
473      * Tests retry functions, when the count is null and retries are exhausted.
474      */
475     @Test
476     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
477         params = params.toBuilder().retry(null).build();
478
479         // new params, thus need a new operation
480         oper = new MyOper();
481
482         oper.setMaxFailures(10);
483
484         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
485     }
486
487     /**
488      * Tests retry functions, when retries are exhausted.
489      */
490     @Test
491     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
492         final int maxRetries = 3;
493         params = params.toBuilder().retry(maxRetries).build();
494
495         // new params, thus need a new operation
496         oper = new MyOper();
497
498         oper.setMaxFailures(10);
499
500         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
501                         PolicyResult.FAILURE_RETRIES);
502     }
503
504     /**
505      * Tests retry functions, when a success follows some retries.
506      */
507     @Test
508     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
509         params = params.toBuilder().retry(10).build();
510
511         // new params, thus need a new operation
512         oper = new MyOper();
513
514         final int maxFailures = 3;
515         oper.setMaxFailures(maxFailures);
516
517         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
518                         PolicyResult.SUCCESS);
519     }
520
521     /**
522      * Tests retry functions, when the outcome is {@code null}.
523      */
524     @Test
525     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
526
527         // arrange to return null from doOperation()
528         oper = new MyOper() {
529             @Override
530             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
531
532                 // update counters
533                 super.doOperation(attempt, outcome);
534                 return null;
535             }
536         };
537
538         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
539     }
540
541     @Test
542     public void testSleep() throws Exception {
543         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
544         assertTrue(future.isDone());
545         assertNull(future.get());
546
547         // edge case
548         future = oper.sleep(0, TimeUnit.SECONDS);
549         assertTrue(future.isDone());
550         assertNull(future.get());
551
552         /*
553          * Start a second sleep we can use to check the first while it's running.
554          */
555         tstart = Instant.now();
556         future = oper.sleep(100, TimeUnit.MILLISECONDS);
557
558         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
559
560         // wait for second to complete and verify that the first has not completed
561         future2.get();
562         assertFalse(future.isDone());
563
564         // wait for second to complete
565         future.get();
566
567         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
568         assertTrue(diff >= 99);
569     }
570
571     @Test
572     public void testIsSameOperation() {
573         assertFalse(oper.isSameOperation(null));
574
575         OperationOutcome outcome = params.makeOutcome();
576
577         // wrong actor - should be false
578         outcome.setActor(null);
579         assertFalse(oper.isSameOperation(outcome));
580         outcome.setActor(MY_SINK);
581         assertFalse(oper.isSameOperation(outcome));
582         outcome.setActor(ACTOR);
583
584         // wrong operation - should be null
585         outcome.setOperation(null);
586         assertFalse(oper.isSameOperation(outcome));
587         outcome.setOperation(MY_SINK);
588         assertFalse(oper.isSameOperation(outcome));
589         outcome.setOperation(OPERATION);
590
591         assertTrue(oper.isSameOperation(outcome));
592     }
593
594     /**
595      * Tests handleFailure() when the outcome is a success.
596      */
597     @Test
598     public void testHandlePreprocessorFailureSuccess() {
599         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
600         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
601     }
602
603     /**
604      * Tests handleFailure() when the outcome is <i>not</i> a success.
605      */
606     @Test
607     public void testHandlePreprocessorFailureFailed() throws Exception {
608         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
609         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
610     }
611
612     /**
613      * Tests handleFailure() when the outcome is {@code null}.
614      */
615     @Test
616     public void testHandlePreprocessorFailureNull() throws Exception {
617         // arrange to return a null outcome from the preprocessor
618         oper.setPreProc(CompletableFuture.completedFuture(null));
619         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
620     }
621
622     @Test
623     public void testFromException() {
624         // arrange to generate an exception when operation runs
625         oper.setGenException(true);
626
627         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
628     }
629
630     /**
631      * Tests fromException() when there is no exception.
632      */
633     @Test
634     public void testFromExceptionNoExcept() {
635         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
636     }
637
638     /**
639      * Tests both flavors of anyOf(), because one invokes the other.
640      */
641     @Test
642     public void testAnyOf() throws Exception {
643         // first task completes, others do not
644         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
645
646         final OperationOutcome outcome = params.makeOutcome();
647
648         tasks.add(() -> CompletableFuture.completedFuture(outcome));
649         tasks.add(() -> new CompletableFuture<>());
650         tasks.add(() -> null);
651         tasks.add(() -> new CompletableFuture<>());
652
653         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
654         assertTrue(executor.runAll(MAX_REQUESTS));
655         assertTrue(result.isDone());
656         assertSame(outcome, result.get());
657
658         // repeat using array form
659         @SuppressWarnings("unchecked")
660         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
661         result = oper.anyOf(tasks.toArray(taskArray));
662         assertTrue(executor.runAll(MAX_REQUESTS));
663         assertTrue(result.isDone());
664         assertSame(outcome, result.get());
665
666         // second task completes, others do not
667         tasks.clear();
668         tasks.add(() -> new CompletableFuture<>());
669         tasks.add(() -> CompletableFuture.completedFuture(outcome));
670         tasks.add(() -> new CompletableFuture<>());
671
672         result = oper.anyOf(tasks);
673         assertTrue(executor.runAll(MAX_REQUESTS));
674         assertTrue(result.isDone());
675         assertSame(outcome, result.get());
676
677         // third task completes, others do not
678         tasks.clear();
679         tasks.add(() -> new CompletableFuture<>());
680         tasks.add(() -> new CompletableFuture<>());
681         tasks.add(() -> CompletableFuture.completedFuture(outcome));
682
683         result = oper.anyOf(tasks);
684         assertTrue(executor.runAll(MAX_REQUESTS));
685         assertTrue(result.isDone());
686         assertSame(outcome, result.get());
687     }
688
689     /**
690      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
691      */
692     @Test
693     @SuppressWarnings("unchecked")
694     public void testAnyOfEdge() throws Exception {
695         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
696
697         // zero items: check both using a list and using an array
698         assertNull(oper.anyOf(tasks));
699         assertNull(oper.anyOf());
700
701         // one item: : check both using a list and using an array
702         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
703         tasks.add(() -> future1);
704
705         assertSame(future1, oper.anyOf(tasks));
706         assertSame(future1, oper.anyOf(() -> future1));
707     }
708
709     @Test
710     public void testAllOfArray() throws Exception {
711         final OperationOutcome outcome = params.makeOutcome();
712
713         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
714         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
715         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
716
717         @SuppressWarnings("unchecked")
718         CompletableFuture<OperationOutcome> result =
719                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
720
721         assertTrue(executor.runAll(MAX_REQUESTS));
722         assertFalse(result.isDone());
723         future1.complete(outcome);
724
725         // complete 3 before 2
726         assertTrue(executor.runAll(MAX_REQUESTS));
727         assertFalse(result.isDone());
728         future3.complete(outcome);
729
730         assertTrue(executor.runAll(MAX_REQUESTS));
731         assertFalse(result.isDone());
732         future2.complete(outcome);
733
734         // all of them are now done
735         assertTrue(executor.runAll(MAX_REQUESTS));
736         assertTrue(result.isDone());
737         assertSame(outcome, result.get());
738     }
739
740     @Test
741     public void testAllOfList() throws Exception {
742         final OperationOutcome outcome = params.makeOutcome();
743
744         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
745         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
746         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
747
748         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
749         tasks.add(() -> future1);
750         tasks.add(() -> future2);
751         tasks.add(() -> null);
752         tasks.add(() -> future3);
753
754         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
755
756         assertTrue(executor.runAll(MAX_REQUESTS));
757         assertFalse(result.isDone());
758         future1.complete(outcome);
759
760         // complete 3 before 2
761         assertTrue(executor.runAll(MAX_REQUESTS));
762         assertFalse(result.isDone());
763         future3.complete(outcome);
764
765         assertTrue(executor.runAll(MAX_REQUESTS));
766         assertFalse(result.isDone());
767         future2.complete(outcome);
768
769         // all of them are now done
770         assertTrue(executor.runAll(MAX_REQUESTS));
771         assertTrue(result.isDone());
772         assertSame(outcome, result.get());
773     }
774
775     /**
776      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
777      */
778     @Test
779     @SuppressWarnings("unchecked")
780     public void testAllOfEdge() throws Exception {
781         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
782
783         // zero items: check both using a list and using an array
784         assertNull(oper.allOf(tasks));
785         assertNull(oper.allOf());
786
787         // one item: : check both using a list and using an array
788         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
789         tasks.add(() -> future1);
790
791         assertSame(future1, oper.allOf(tasks));
792         assertSame(future1, oper.allOf(() -> future1));
793     }
794
795     @Test
796     public void testAttachFutures() throws Exception {
797         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
798
799         // third task throws an exception during construction
800         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
801         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
802         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
803         tasks.add(() -> future1);
804         tasks.add(() -> future2);
805         tasks.add(() -> {
806             throw new IllegalStateException(EXPECTED_EXCEPTION);
807         });
808         tasks.add(() -> future3);
809
810         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
811
812         // should have canceled the first two, but not the last
813         assertTrue(future1.isCancelled());
814         assertTrue(future2.isCancelled());
815         assertFalse(future3.isCancelled());
816     }
817
818     @Test
819     public void testCombineOutcomes() throws Exception {
820         // only one outcome
821         verifyOutcomes(0, PolicyResult.SUCCESS);
822         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
823
824         // maximum is in different positions
825         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
826         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
827         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
828
829         // null outcome - takes precedence over a success
830         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
831         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
832         tasks.add(() -> CompletableFuture.completedFuture(null));
833         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
834         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
835
836         assertTrue(executor.runAll(MAX_REQUESTS));
837         assertTrue(result.isDone());
838         assertNull(result.get());
839
840         // one throws an exception during execution
841         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
842
843         tasks.clear();
844         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
845         tasks.add(() -> CompletableFuture.failedFuture(except));
846         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
847         result = oper.allOf(tasks);
848
849         assertTrue(executor.runAll(MAX_REQUESTS));
850         assertTrue(result.isCompletedExceptionally());
851         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
852     }
853
854     /**
855      * Tests both flavors of sequence(), because one invokes the other.
856      */
857     @Test
858     public void testSequence() throws Exception {
859         final OperationOutcome outcome = params.makeOutcome();
860
861         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
862         tasks.add(() -> CompletableFuture.completedFuture(outcome));
863         tasks.add(() -> null);
864         tasks.add(() -> CompletableFuture.completedFuture(outcome));
865         tasks.add(() -> CompletableFuture.completedFuture(outcome));
866
867         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
868         assertTrue(executor.runAll(MAX_REQUESTS));
869         assertTrue(result.isDone());
870         assertSame(outcome, result.get());
871
872         // repeat using array form
873         @SuppressWarnings("unchecked")
874         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
875         result = oper.sequence(tasks.toArray(taskArray));
876         assertTrue(executor.runAll(MAX_REQUESTS));
877         assertTrue(result.isDone());
878         assertSame(outcome, result.get());
879
880         // second task fails, third should not run
881         OperationOutcome failure = params.makeOutcome();
882         failure.setResult(PolicyResult.FAILURE);
883         tasks.clear();
884         tasks.add(() -> CompletableFuture.completedFuture(outcome));
885         tasks.add(() -> CompletableFuture.completedFuture(failure));
886         tasks.add(() -> CompletableFuture.completedFuture(outcome));
887
888         result = oper.sequence(tasks);
889         assertTrue(executor.runAll(MAX_REQUESTS));
890         assertTrue(result.isDone());
891         assertSame(failure, result.get());
892     }
893
894     /**
895      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
896      */
897     @Test
898     @SuppressWarnings("unchecked")
899     public void testSequenceEdge() throws Exception {
900         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
901
902         // zero items: check both using a list and using an array
903         assertNull(oper.sequence(tasks));
904         assertNull(oper.sequence());
905
906         // one item: : check both using a list and using an array
907         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
908         tasks.add(() -> future1);
909
910         assertSame(future1, oper.sequence(tasks));
911         assertSame(future1, oper.sequence(() -> future1));
912     }
913
914     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
915         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
916
917         OperationOutcome expectedOutcome = null;
918
919         for (int count = 0; count < results.length; ++count) {
920             OperationOutcome outcome = params.makeOutcome();
921             outcome.setResult(results[count]);
922             tasks.add(() -> CompletableFuture.completedFuture(outcome));
923
924             if (count == expected) {
925                 expectedOutcome = outcome;
926             }
927         }
928
929         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
930
931         assertTrue(executor.runAll(MAX_REQUESTS));
932         assertTrue(result.isDone());
933         assertSame(expectedOutcome, result.get());
934     }
935
936     @Test
937     public void testDetmPriority() throws CoderException {
938         assertEquals(1, oper.detmPriority(null));
939
940         OperationOutcome outcome = params.makeOutcome();
941
942         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
943                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
944                         PolicyResult.FAILURE_EXCEPTION, 6);
945
946         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
947             outcome.setResult(ent.getKey());
948             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
949         }
950
951         /*
952          * Test null result. We can't actually set it to null, because the set() method
953          * won't allow it. Instead, we decode it from a structure.
954          */
955         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
956         assertEquals(1, oper.detmPriority(outcome));
957     }
958
959     /**
960      * Tests callbackStarted() when the pipeline has already been stopped.
961      */
962     @Test
963     public void testCallbackStartedNotRunning() {
964         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
965
966         /*
967          * arrange to stop the controller when the start-callback is invoked, but capture
968          * the outcome
969          */
970         params = params.toBuilder().startCallback(oper -> {
971             starter(oper);
972             future.get().cancel(false);
973         }).build();
974
975         // new params, thus need a new operation
976         oper = new MyOper();
977
978         future.set(oper.start());
979         assertTrue(executor.runAll(MAX_REQUESTS));
980
981         // should have only run once
982         assertEquals(1, numStart);
983     }
984
985     /**
986      * Tests callbackCompleted() when the pipeline has already been stopped.
987      */
988     @Test
989     public void testCallbackCompletedNotRunning() {
990         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
991
992         // arrange to stop the controller when the start-callback is invoked
993         params = params.toBuilder().startCallback(oper -> {
994             future.get().cancel(false);
995         }).build();
996
997         // new params, thus need a new operation
998         oper = new MyOper();
999
1000         future.set(oper.start());
1001         assertTrue(executor.runAll(MAX_REQUESTS));
1002
1003         // should not have been set
1004         assertNull(opend);
1005         assertEquals(0, numEnd);
1006     }
1007
1008     @Test
1009     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1010         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1011
1012         OperationOutcome outcome;
1013
1014         outcome = new OperationOutcome();
1015         oper.setOutcome(outcome, timex);
1016         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1017         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1018
1019         outcome = new OperationOutcome();
1020         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1021         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1022         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1023     }
1024
1025     @Test
1026     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1027         OperationOutcome outcome;
1028
1029         outcome = new OperationOutcome();
1030         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1031         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1032         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1033
1034         for (PolicyResult result : FAILURE_RESULTS) {
1035             outcome = new OperationOutcome();
1036             oper.setOutcome(outcome, result);
1037             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1038             assertEquals(result.toString(), result, outcome.getResult());
1039         }
1040     }
1041
1042     @Test
1043     public void testIsTimeout() {
1044         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1045
1046         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1047         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1048         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1049         assertFalse(oper.isTimeout(new CompletionException(null)));
1050         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1051
1052         assertTrue(oper.isTimeout(timex));
1053         assertTrue(oper.isTimeout(new CompletionException(timex)));
1054     }
1055
1056     @Test
1057     public void testLogMessage() {
1058         final String infraStr = SINK_INFRA.toString();
1059
1060         // log structured data
1061         appender.clearExtractions();
1062         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1063         List<String> output = appender.getExtracted();
1064         assertEquals(1, output.size());
1065
1066         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1067                         .contains("{\n  \"text\": \"my-text\"\n}");
1068
1069         // repeat with a response
1070         appender.clearExtractions();
1071         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1072         output = appender.getExtracted();
1073         assertEquals(1, output.size());
1074
1075         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1076                         .contains("{\n  \"text\": \"my-text\"\n}");
1077
1078         // log a plain string
1079         appender.clearExtractions();
1080         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1081         output = appender.getExtracted();
1082         assertEquals(1, output.size());
1083         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1084
1085         // log a null request
1086         appender.clearExtractions();
1087         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1088         output = appender.getExtracted();
1089         assertEquals(1, output.size());
1090
1091         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1092
1093         // generate exception from coder
1094         setOperCoderException();
1095
1096         appender.clearExtractions();
1097         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1098         output = appender.getExtracted();
1099         assertEquals(2, output.size());
1100         assertThat(output.get(0)).contains("cannot pretty-print request");
1101         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1102
1103         // repeat with a response
1104         appender.clearExtractions();
1105         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1106         output = appender.getExtracted();
1107         assertEquals(2, output.size());
1108         assertThat(output.get(0)).contains("cannot pretty-print response");
1109         assertThat(output.get(1)).contains(MY_SOURCE);
1110     }
1111
1112     @Test
1113     public void testGetRetry() {
1114         assertEquals(0, oper.getRetry(null));
1115         assertEquals(10, oper.getRetry(10));
1116     }
1117
1118     @Test
1119     public void testGetRetryWait() {
1120         // need an operator that doesn't override the retry time
1121         OperationPartial oper2 = new OperationPartial(params, config) {};
1122         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1123     }
1124
1125     @Test
1126     public void testGetTimeOutMs() {
1127         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1128
1129         params = params.toBuilder().timeoutSec(null).build();
1130
1131         // new params, thus need a new operation
1132         oper = new MyOper();
1133
1134         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1135     }
1136
1137     private void starter(OperationOutcome oper) {
1138         ++numStart;
1139         tstart = oper.getStart();
1140         opstart = oper;
1141         starts.add(oper);
1142     }
1143
1144     private void completer(OperationOutcome oper) {
1145         ++numEnd;
1146         opend = oper;
1147         ends.add(oper);
1148     }
1149
1150     /**
1151      * Gets a function that does nothing.
1152      *
1153      * @param <T> type of input parameter expected by the function
1154      * @return a function that does nothing
1155      */
1156     private <T> Consumer<T> noop() {
1157         return unused -> {
1158         };
1159     }
1160
1161     private OperationOutcome makeSuccess() {
1162         OperationOutcome outcome = params.makeOutcome();
1163         outcome.setResult(PolicyResult.SUCCESS);
1164
1165         return outcome;
1166     }
1167
1168     private OperationOutcome makeFailure() {
1169         OperationOutcome outcome = params.makeOutcome();
1170         outcome.setResult(PolicyResult.FAILURE);
1171
1172         return outcome;
1173     }
1174
1175     /**
1176      * Verifies a run.
1177      *
1178      * @param testName test name
1179      * @param expectedCallbacks number of callbacks expected
1180      * @param expectedOperations number of operation invocations expected
1181      * @param expectedResult expected outcome
1182      */
1183     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1184                     PolicyResult expectedResult) {
1185
1186         String expectedSubRequestId =
1187                         (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1188
1189         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1190     }
1191
1192     /**
1193      * Verifies a run.
1194      *
1195      * @param testName test name
1196      * @param expectedCallbacks number of callbacks expected
1197      * @param expectedOperations number of operation invocations expected
1198      * @param expectedResult expected outcome
1199      * @param expectedSubRequestId expected sub request ID
1200      * @param manipulator function to modify the future returned by
1201      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1202      *        in the executor are run
1203      */
1204     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1205                     String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1206
1207         tstart = null;
1208         opstart = null;
1209         opend = null;
1210         starts.clear();
1211         ends.clear();
1212
1213         CompletableFuture<OperationOutcome> future = oper.start();
1214
1215         manipulator.accept(future);
1216
1217         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1218
1219         assertEquals(testName, expectedCallbacks, numStart);
1220         assertEquals(testName, expectedCallbacks, numEnd);
1221
1222         if (expectedCallbacks > 0) {
1223             assertNotNull(testName, opstart);
1224             assertNotNull(testName, opend);
1225             assertEquals(testName, expectedResult, opend.getResult());
1226
1227             assertSame(testName, tstart, opstart.getStart());
1228             assertSame(testName, tstart, opend.getStart());
1229
1230             try {
1231                 assertTrue(future.isDone());
1232                 assertEquals(testName, opend, future.get());
1233
1234                 // "start" is never final
1235                 for (OperationOutcome outcome : starts) {
1236                     assertFalse(testName, outcome.isFinalOutcome());
1237                 }
1238
1239                 // only the last "complete" is final
1240                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1241
1242                 for (OperationOutcome outcome : ends) {
1243                     assertFalse(outcome.isFinalOutcome());
1244                 }
1245
1246             } catch (InterruptedException | ExecutionException e) {
1247                 throw new IllegalStateException(e);
1248             }
1249
1250             if (expectedOperations > 0) {
1251                 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1252             }
1253         }
1254
1255         assertEquals(testName, expectedOperations, oper.getCount());
1256     }
1257
1258     /**
1259      * Creates a new {@link #oper} whose coder will throw an exception.
1260      */
1261     private void setOperCoderException() {
1262         oper = new MyOper() {
1263             @Override
1264             protected Coder makeCoder() {
1265                 return new StandardCoder() {
1266                     @Override
1267                     public String encode(Object object, boolean pretty) throws CoderException {
1268                         throw new CoderException(EXPECTED_EXCEPTION);
1269                     }
1270                 };
1271             }
1272         };
1273     }
1274
1275
1276     @Getter
1277     public static class MyData {
1278         private String text = TEXT;
1279     }
1280
1281
1282     private class MyOper extends OperationPartial {
1283         @Getter
1284         private int count = 0;
1285
1286         @Setter
1287         private boolean genException;
1288         @Setter
1289         private int maxFailures = 0;
1290         @Setter
1291         private CompletableFuture<OperationOutcome> preProc;
1292
1293
1294         public MyOper() {
1295             super(OperationPartialTest.this.params, config);
1296         }
1297
1298         @Override
1299         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1300             ++count;
1301             if (genException) {
1302                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1303             }
1304
1305             operation.setSubRequestId(String.valueOf(attempt));
1306
1307             if (count > maxFailures) {
1308                 operation.setResult(PolicyResult.SUCCESS);
1309             } else {
1310                 operation.setResult(PolicyResult.FAILURE);
1311             }
1312
1313             return operation;
1314         }
1315
1316         @Override
1317         protected long getRetryWaitMs() {
1318             /*
1319              * Sleep timers run in the background, but we want to control things via the
1320              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1321              */
1322             return 0L;
1323         }
1324
1325         @Override
1326         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1327             return (preProc != null ? preProc : super.startPreprocessorAsync());
1328         }
1329     }
1330 }