6d543582740c350644924a3ed4866a8aa48d2dd2
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.Deque;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.UUID;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.CompletionException;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicReference;
54 import java.util.function.Consumer;
55 import java.util.function.Supplier;
56 import java.util.stream.Collectors;
57 import lombok.Getter;
58 import lombok.Setter;
59 import org.junit.AfterClass;
60 import org.junit.Before;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.MockitoAnnotations;
66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
68 import org.onap.policy.common.utils.coder.Coder;
69 import org.onap.policy.common.utils.coder.CoderException;
70 import org.onap.policy.common.utils.coder.StandardCoder;
71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
72 import org.onap.policy.common.utils.time.PseudoExecutor;
73 import org.onap.policy.controlloop.ControlLoopOperation;
74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
79 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
81 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
82 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
83 import org.onap.policy.controlloop.policy.PolicyResult;
84 import org.slf4j.LoggerFactory;
85
86 public class OperationPartialTest {
87     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
88     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
89     private static final int MAX_REQUESTS = 100;
90     private static final int MAX_PARALLEL = 10;
91     private static final String EXPECTED_EXCEPTION = "expected exception";
92     private static final String ACTOR = "my-actor";
93     private static final String OPERATION = "my-operation";
94     private static final String MY_SINK = "my-sink";
95     private static final String MY_SOURCE = "my-source";
96     private static final String MY_TARGET_ENTITY = "my-entity";
97     private static final String TEXT = "my-text";
98     private static final int TIMEOUT = 1000;
99     private static final UUID REQ_ID = UUID.randomUUID();
100
101     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
102                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
103
104     /**
105      * Used to attach an appender to the class' logger.
106      */
107     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
108     private static final ExtractAppender appender = new ExtractAppender();
109
110     private static final List<String> PROP_NAMES = List.of("hello", "world");
111
112     @Mock
113     private ActorService service;
114     @Mock
115     private Actor guardActor;
116     @Mock
117     private Operator guardOperator;
118     @Mock
119     private Operation guardOperation;
120
121     private VirtualControlLoopEvent event;
122     private ControlLoopEventContext context;
123     private PseudoExecutor executor;
124     private ControlLoopOperationParams params;
125
126     private MyOper oper;
127
128     private int numStart;
129     private int numEnd;
130
131     private Instant tstart;
132
133     private OperationOutcome opstart;
134     private OperationOutcome opend;
135
136     private Deque<OperationOutcome> starts;
137     private Deque<OperationOutcome> ends;
138
139     private OperatorConfig config;
140
141     /**
142      * Attaches the appender to the logger.
143      */
144     @BeforeClass
145     public static void setUpBeforeClass() throws Exception {
146         /*
147          * Attach appender to the logger.
148          */
149         appender.setContext(logger.getLoggerContext());
150         appender.start();
151
152         logger.addAppender(appender);
153     }
154
155     /**
156      * Stops the appender.
157      */
158     @AfterClass
159     public static void tearDownAfterClass() {
160         appender.stop();
161     }
162
163     /**
164      * Initializes the fields, including {@link #oper}.
165      */
166     @Before
167     public void setUp() {
168         MockitoAnnotations.initMocks(this);
169
170         event = new VirtualControlLoopEvent();
171         event.setRequestId(REQ_ID);
172
173         context = new ControlLoopEventContext(event);
174         executor = new PseudoExecutor();
175
176         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
177                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
178                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
179
180         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
181         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
182         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
183         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
184
185         config = new OperatorConfig(executor);
186
187         oper = new MyOper();
188
189         tstart = null;
190
191         opstart = null;
192         opend = null;
193
194         starts = new ArrayDeque<>(10);
195         ends = new ArrayDeque<>(10);
196     }
197
198     @Test
199     public void testOperatorPartial_testGetActorName_testGetName() {
200         assertEquals(ACTOR, oper.getActorName());
201         assertEquals(OPERATION, oper.getName());
202         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
203     }
204
205     @Test
206     public void testGetBlockingThread() throws Exception {
207         CompletableFuture<Void> future = new CompletableFuture<>();
208
209         // use the real executor
210         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
211             @Override
212             public Operation buildOperation(ControlLoopOperationParams params) {
213                 return null;
214             }
215         };
216
217         oper2.getBlockingExecutor().execute(() -> future.complete(null));
218
219         assertNull(future.get(5, TimeUnit.SECONDS));
220     }
221
222     @Test
223     public void testGetPropertyNames() {
224         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
225     }
226
227     @Test
228     public void testGetProperty_testSetProperty() {
229         oper.setProperty("propertyA", "valueA");
230         oper.setProperty("propertyB", "valueB");
231         oper.setProperty("propertyC", 20);
232
233         assertEquals("valueA", oper.getProperty("propertyA"));
234         assertEquals("valueB", oper.getProperty("propertyB"));
235         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
236     }
237
238     @Test
239     public void testStart() {
240         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
241     }
242
243     /**
244      * Tests start() with multiple running requests.
245      */
246     @Test
247     public void testStartMultiple() {
248         for (int count = 0; count < MAX_PARALLEL; ++count) {
249             oper.start();
250         }
251
252         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
253
254         assertNotNull(opstart);
255         assertNotNull(opend);
256         assertEquals(PolicyResult.SUCCESS, opend.getResult());
257
258         assertEquals(MAX_PARALLEL, numStart);
259         assertEquals(MAX_PARALLEL, oper.getCount());
260         assertEquals(MAX_PARALLEL, numEnd);
261     }
262
263     /**
264      * Tests startPreprocessor() when the preprocessor returns a failure.
265      */
266     @Test
267     public void testStartPreprocessorFailure() {
268         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
269
270         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
271     }
272
273     /**
274      * Tests startPreprocessor() when the preprocessor throws an exception.
275      */
276     @Test
277     public void testStartPreprocessorException() {
278         // arrange for the preprocessor to throw an exception
279         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
280
281         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
282     }
283
284     /**
285      * Tests startPreprocessor() when the pipeline is not running.
286      */
287     @Test
288     public void testStartPreprocessorNotRunning() {
289         // arrange for the preprocessor to return success, which will be ignored
290         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
291
292         oper.start().cancel(false);
293         assertTrue(executor.runAll(MAX_REQUESTS));
294
295         assertNull(opstart);
296         assertNull(opend);
297
298         assertEquals(0, numStart);
299         assertEquals(0, oper.getCount());
300         assertEquals(0, numEnd);
301     }
302
303     /**
304      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
305      */
306     @Test
307     public void testStartPreprocessorBuilderException() {
308         oper = new MyOper() {
309             @Override
310             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
311                 throw new IllegalStateException(EXPECTED_EXCEPTION);
312             }
313         };
314
315         assertThatIllegalStateException().isThrownBy(() -> oper.start());
316
317         // should be nothing in the queue
318         assertEquals(0, executor.getQueueLength());
319     }
320
321     @Test
322     public void testStartPreprocessorAsync() {
323         assertNull(oper.startPreprocessorAsync());
324     }
325
326     @Test
327     public void testStartGuardAsync() throws Exception {
328         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
329         assertTrue(future.isDone());
330         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
331
332         // verify the parameters that were passed
333         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
334                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
335         verify(guardOperator).buildOperation(paramsCaptor.capture());
336
337         params = paramsCaptor.getValue();
338         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
339         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
340         assertNull(params.getRetry());
341         assertNull(params.getTimeoutSec());
342
343         Map<String, Object> payload = params.getPayload();
344         assertNotNull(payload);
345
346         assertEquals(oper.makeGuardPayload(), payload);
347     }
348
349     /**
350      * Tests startGuardAsync() when preprocessing is disabled.
351      */
352     @Test
353     public void testStartGuardAsyncDisabled() {
354         params = params.toBuilder().preprocessed(true).build();
355         assertNull(new MyOper().startGuardAsync());
356     }
357
358     @Test
359     public void testMakeGuardPayload() {
360         Map<String, Object> payload = oper.makeGuardPayload();
361         assertSame(REQ_ID, payload.get("requestId"));
362
363         // request id changes, so remove it
364         payload.remove("requestId");
365
366         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
367
368         // repeat, but with closed loop name
369         event.setClosedLoopControlName("my-loop");
370         payload = oper.makeGuardPayload();
371         payload.remove("requestId");
372         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
373     }
374
375     @Test
376     public void testStartOperationAsync() {
377         oper.start();
378         assertTrue(executor.runAll(MAX_REQUESTS));
379
380         assertEquals(1, oper.getCount());
381     }
382
383     @Test
384     public void testIsSuccess() {
385         assertFalse(oper.isSuccess(null));
386
387         OperationOutcome outcome = new OperationOutcome();
388
389         outcome.setResult(PolicyResult.SUCCESS);
390         assertTrue(oper.isSuccess(outcome));
391
392         for (PolicyResult failure : FAILURE_RESULTS) {
393             outcome.setResult(failure);
394             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
395         }
396     }
397
398     @Test
399     public void testIsActorFailed() {
400         assertFalse(oper.isActorFailed(null));
401
402         OperationOutcome outcome = params.makeOutcome();
403
404         // incorrect outcome
405         outcome.setResult(PolicyResult.SUCCESS);
406         assertFalse(oper.isActorFailed(outcome));
407
408         outcome.setResult(PolicyResult.FAILURE_RETRIES);
409         assertFalse(oper.isActorFailed(outcome));
410
411         // correct outcome
412         outcome.setResult(PolicyResult.FAILURE);
413
414         // incorrect actor
415         outcome.setActor(MY_SINK);
416         assertFalse(oper.isActorFailed(outcome));
417         outcome.setActor(null);
418         assertFalse(oper.isActorFailed(outcome));
419         outcome.setActor(ACTOR);
420
421         // incorrect operation
422         outcome.setOperation(MY_SINK);
423         assertFalse(oper.isActorFailed(outcome));
424         outcome.setOperation(null);
425         assertFalse(oper.isActorFailed(outcome));
426         outcome.setOperation(OPERATION);
427
428         // correct values
429         assertTrue(oper.isActorFailed(outcome));
430     }
431
432     @Test
433     public void testDoOperation() {
434         /*
435          * Use an operation that doesn't override doOperation().
436          */
437         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
438
439         oper2.start();
440         assertTrue(executor.runAll(MAX_REQUESTS));
441
442         assertNotNull(opend);
443         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
444     }
445
446     @Test
447     public void testTimeout() throws Exception {
448
449         // use a real executor
450         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
451
452         // trigger timeout very quickly
453         oper = new MyOper() {
454             @Override
455             protected long getTimeoutMs(Integer timeoutSec) {
456                 return 1;
457             }
458
459             @Override
460             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
461
462                 OperationOutcome outcome2 = params.makeOutcome();
463                 outcome2.setResult(PolicyResult.SUCCESS);
464
465                 /*
466                  * Create an incomplete future that will timeout after the operation's
467                  * timeout. If it fires before the other timer, then it will return a
468                  * SUCCESS outcome.
469                  */
470                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
471                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
472                                 params.getExecutor());
473
474                 return future;
475             }
476         };
477
478         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
479     }
480
481     /**
482      * Tests retry functions, when the count is set to zero and retries are exhausted.
483      */
484     @Test
485     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
486         params = params.toBuilder().retry(0).build();
487
488         // new params, thus need a new operation
489         oper = new MyOper();
490
491         oper.setMaxFailures(10);
492
493         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
494     }
495
496     /**
497      * Tests retry functions, when the count is null and retries are exhausted.
498      */
499     @Test
500     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
501         params = params.toBuilder().retry(null).build();
502
503         // new params, thus need a new operation
504         oper = new MyOper();
505
506         oper.setMaxFailures(10);
507
508         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
509     }
510
511     /**
512      * Tests retry functions, when retries are exhausted.
513      */
514     @Test
515     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
516         final int maxRetries = 3;
517         params = params.toBuilder().retry(maxRetries).build();
518
519         // new params, thus need a new operation
520         oper = new MyOper();
521
522         oper.setMaxFailures(10);
523
524         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
525                         PolicyResult.FAILURE_RETRIES);
526     }
527
528     /**
529      * Tests retry functions, when a success follows some retries.
530      */
531     @Test
532     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
533         params = params.toBuilder().retry(10).build();
534
535         // new params, thus need a new operation
536         oper = new MyOper();
537
538         final int maxFailures = 3;
539         oper.setMaxFailures(maxFailures);
540
541         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
542                         PolicyResult.SUCCESS);
543     }
544
545     /**
546      * Tests retry functions, when the outcome is {@code null}.
547      */
548     @Test
549     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
550
551         // arrange to return null from doOperation()
552         oper = new MyOper() {
553             @Override
554             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
555
556                 // update counters
557                 super.doOperation(attempt, outcome);
558                 return null;
559             }
560         };
561
562         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
563     }
564
565     @Test
566     public void testSleep() throws Exception {
567         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
568         assertTrue(future.isDone());
569         assertNull(future.get());
570
571         // edge case
572         future = oper.sleep(0, TimeUnit.SECONDS);
573         assertTrue(future.isDone());
574         assertNull(future.get());
575
576         /*
577          * Start a second sleep we can use to check the first while it's running.
578          */
579         tstart = Instant.now();
580         future = oper.sleep(100, TimeUnit.MILLISECONDS);
581
582         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
583
584         // wait for second to complete and verify that the first has not completed
585         future2.get();
586         assertFalse(future.isDone());
587
588         // wait for second to complete
589         future.get();
590
591         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
592         assertTrue(diff >= 99);
593     }
594
595     @Test
596     public void testIsSameOperation() {
597         assertFalse(oper.isSameOperation(null));
598
599         OperationOutcome outcome = params.makeOutcome();
600
601         // wrong actor - should be false
602         outcome.setActor(null);
603         assertFalse(oper.isSameOperation(outcome));
604         outcome.setActor(MY_SINK);
605         assertFalse(oper.isSameOperation(outcome));
606         outcome.setActor(ACTOR);
607
608         // wrong operation - should be null
609         outcome.setOperation(null);
610         assertFalse(oper.isSameOperation(outcome));
611         outcome.setOperation(MY_SINK);
612         assertFalse(oper.isSameOperation(outcome));
613         outcome.setOperation(OPERATION);
614
615         assertTrue(oper.isSameOperation(outcome));
616     }
617
618     /**
619      * Tests handleFailure() when the outcome is a success.
620      */
621     @Test
622     public void testHandlePreprocessorFailureSuccess() {
623         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
624         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
625     }
626
627     /**
628      * Tests handleFailure() when the outcome is <i>not</i> a success.
629      */
630     @Test
631     public void testHandlePreprocessorFailureFailed() throws Exception {
632         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
633         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
634     }
635
636     /**
637      * Tests handleFailure() when the outcome is {@code null}.
638      */
639     @Test
640     public void testHandlePreprocessorFailureNull() throws Exception {
641         // arrange to return a null outcome from the preprocessor
642         oper.setPreProc(CompletableFuture.completedFuture(null));
643         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
644     }
645
646     @Test
647     public void testFromException() {
648         // arrange to generate an exception when operation runs
649         oper.setGenException(true);
650
651         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
652     }
653
654     /**
655      * Tests fromException() when there is no exception.
656      */
657     @Test
658     public void testFromExceptionNoExcept() {
659         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
660     }
661
662     /**
663      * Tests both flavors of anyOf(), because one invokes the other.
664      */
665     @Test
666     public void testAnyOf() throws Exception {
667         // first task completes, others do not
668         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
669
670         final OperationOutcome outcome = params.makeOutcome();
671
672         tasks.add(() -> CompletableFuture.completedFuture(outcome));
673         tasks.add(() -> new CompletableFuture<>());
674         tasks.add(() -> null);
675         tasks.add(() -> new CompletableFuture<>());
676
677         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
678         assertTrue(executor.runAll(MAX_REQUESTS));
679         assertTrue(result.isDone());
680         assertSame(outcome, result.get());
681
682         // repeat using array form
683         @SuppressWarnings("unchecked")
684         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
685         result = oper.anyOf(tasks.toArray(taskArray));
686         assertTrue(executor.runAll(MAX_REQUESTS));
687         assertTrue(result.isDone());
688         assertSame(outcome, result.get());
689
690         // second task completes, others do not
691         tasks.clear();
692         tasks.add(() -> new CompletableFuture<>());
693         tasks.add(() -> CompletableFuture.completedFuture(outcome));
694         tasks.add(() -> new CompletableFuture<>());
695
696         result = oper.anyOf(tasks);
697         assertTrue(executor.runAll(MAX_REQUESTS));
698         assertTrue(result.isDone());
699         assertSame(outcome, result.get());
700
701         // third task completes, others do not
702         tasks.clear();
703         tasks.add(() -> new CompletableFuture<>());
704         tasks.add(() -> new CompletableFuture<>());
705         tasks.add(() -> CompletableFuture.completedFuture(outcome));
706
707         result = oper.anyOf(tasks);
708         assertTrue(executor.runAll(MAX_REQUESTS));
709         assertTrue(result.isDone());
710         assertSame(outcome, result.get());
711     }
712
713     /**
714      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
715      */
716     @Test
717     @SuppressWarnings("unchecked")
718     public void testAnyOfEdge() throws Exception {
719         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
720
721         // zero items: check both using a list and using an array
722         assertNull(oper.anyOf(tasks));
723         assertNull(oper.anyOf());
724
725         // one item: : check both using a list and using an array
726         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
727         tasks.add(() -> future1);
728
729         assertSame(future1, oper.anyOf(tasks));
730         assertSame(future1, oper.anyOf(() -> future1));
731     }
732
733     @Test
734     public void testAllOfArray() throws Exception {
735         final OperationOutcome outcome = params.makeOutcome();
736
737         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
738         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
739         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
740
741         @SuppressWarnings("unchecked")
742         CompletableFuture<OperationOutcome> result =
743                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
744
745         assertTrue(executor.runAll(MAX_REQUESTS));
746         assertFalse(result.isDone());
747         future1.complete(outcome);
748
749         // complete 3 before 2
750         assertTrue(executor.runAll(MAX_REQUESTS));
751         assertFalse(result.isDone());
752         future3.complete(outcome);
753
754         assertTrue(executor.runAll(MAX_REQUESTS));
755         assertFalse(result.isDone());
756         future2.complete(outcome);
757
758         // all of them are now done
759         assertTrue(executor.runAll(MAX_REQUESTS));
760         assertTrue(result.isDone());
761         assertSame(outcome, result.get());
762     }
763
764     @Test
765     public void testAllOfList() throws Exception {
766         final OperationOutcome outcome = params.makeOutcome();
767
768         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
769         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
770         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
771
772         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
773         tasks.add(() -> future1);
774         tasks.add(() -> future2);
775         tasks.add(() -> null);
776         tasks.add(() -> future3);
777
778         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
779
780         assertTrue(executor.runAll(MAX_REQUESTS));
781         assertFalse(result.isDone());
782         future1.complete(outcome);
783
784         // complete 3 before 2
785         assertTrue(executor.runAll(MAX_REQUESTS));
786         assertFalse(result.isDone());
787         future3.complete(outcome);
788
789         assertTrue(executor.runAll(MAX_REQUESTS));
790         assertFalse(result.isDone());
791         future2.complete(outcome);
792
793         // all of them are now done
794         assertTrue(executor.runAll(MAX_REQUESTS));
795         assertTrue(result.isDone());
796         assertSame(outcome, result.get());
797     }
798
799     /**
800      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
801      */
802     @Test
803     @SuppressWarnings("unchecked")
804     public void testAllOfEdge() throws Exception {
805         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
806
807         // zero items: check both using a list and using an array
808         assertNull(oper.allOf(tasks));
809         assertNull(oper.allOf());
810
811         // one item: : check both using a list and using an array
812         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
813         tasks.add(() -> future1);
814
815         assertSame(future1, oper.allOf(tasks));
816         assertSame(future1, oper.allOf(() -> future1));
817     }
818
819     @Test
820     public void testAttachFutures() throws Exception {
821         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
822
823         // third task throws an exception during construction
824         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
825         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
826         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
827         tasks.add(() -> future1);
828         tasks.add(() -> future2);
829         tasks.add(() -> {
830             throw new IllegalStateException(EXPECTED_EXCEPTION);
831         });
832         tasks.add(() -> future3);
833
834         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
835
836         // should have canceled the first two, but not the last
837         assertTrue(future1.isCancelled());
838         assertTrue(future2.isCancelled());
839         assertFalse(future3.isCancelled());
840     }
841
842     @Test
843     public void testCombineOutcomes() throws Exception {
844         // only one outcome
845         verifyOutcomes(0, PolicyResult.SUCCESS);
846         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
847
848         // maximum is in different positions
849         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
850         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
851         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
852
853         // null outcome - takes precedence over a success
854         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
855         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
856         tasks.add(() -> CompletableFuture.completedFuture(null));
857         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
858         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
859
860         assertTrue(executor.runAll(MAX_REQUESTS));
861         assertTrue(result.isDone());
862         assertNull(result.get());
863
864         // one throws an exception during execution
865         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
866
867         tasks.clear();
868         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
869         tasks.add(() -> CompletableFuture.failedFuture(except));
870         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
871         result = oper.allOf(tasks);
872
873         assertTrue(executor.runAll(MAX_REQUESTS));
874         assertTrue(result.isCompletedExceptionally());
875         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
876     }
877
878     /**
879      * Tests both flavors of sequence(), because one invokes the other.
880      */
881     @Test
882     public void testSequence() throws Exception {
883         final OperationOutcome outcome = params.makeOutcome();
884
885         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
886         tasks.add(() -> CompletableFuture.completedFuture(outcome));
887         tasks.add(() -> null);
888         tasks.add(() -> CompletableFuture.completedFuture(outcome));
889         tasks.add(() -> CompletableFuture.completedFuture(outcome));
890
891         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
892         assertTrue(executor.runAll(MAX_REQUESTS));
893         assertTrue(result.isDone());
894         assertSame(outcome, result.get());
895
896         // repeat using array form
897         @SuppressWarnings("unchecked")
898         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
899         result = oper.sequence(tasks.toArray(taskArray));
900         assertTrue(executor.runAll(MAX_REQUESTS));
901         assertTrue(result.isDone());
902         assertSame(outcome, result.get());
903
904         // second task fails, third should not run
905         OperationOutcome failure = params.makeOutcome();
906         failure.setResult(PolicyResult.FAILURE);
907         tasks.clear();
908         tasks.add(() -> CompletableFuture.completedFuture(outcome));
909         tasks.add(() -> CompletableFuture.completedFuture(failure));
910         tasks.add(() -> CompletableFuture.completedFuture(outcome));
911
912         result = oper.sequence(tasks);
913         assertTrue(executor.runAll(MAX_REQUESTS));
914         assertTrue(result.isDone());
915         assertSame(failure, result.get());
916     }
917
918     /**
919      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
920      */
921     @Test
922     @SuppressWarnings("unchecked")
923     public void testSequenceEdge() throws Exception {
924         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
925
926         // zero items: check both using a list and using an array
927         assertNull(oper.sequence(tasks));
928         assertNull(oper.sequence());
929
930         // one item: : check both using a list and using an array
931         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
932         tasks.add(() -> future1);
933
934         assertSame(future1, oper.sequence(tasks));
935         assertSame(future1, oper.sequence(() -> future1));
936     }
937
938     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
939         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
940
941         OperationOutcome expectedOutcome = null;
942
943         for (int count = 0; count < results.length; ++count) {
944             OperationOutcome outcome = params.makeOutcome();
945             outcome.setResult(results[count]);
946             tasks.add(() -> CompletableFuture.completedFuture(outcome));
947
948             if (count == expected) {
949                 expectedOutcome = outcome;
950             }
951         }
952
953         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
954
955         assertTrue(executor.runAll(MAX_REQUESTS));
956         assertTrue(result.isDone());
957         assertSame(expectedOutcome, result.get());
958     }
959
960     @Test
961     public void testDetmPriority() throws CoderException {
962         assertEquals(1, oper.detmPriority(null));
963
964         OperationOutcome outcome = params.makeOutcome();
965
966         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
967                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
968                         PolicyResult.FAILURE_EXCEPTION, 6);
969
970         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
971             outcome.setResult(ent.getKey());
972             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
973         }
974
975         /*
976          * Test null result. We can't actually set it to null, because the set() method
977          * won't allow it. Instead, we decode it from a structure.
978          */
979         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
980         assertEquals(1, oper.detmPriority(outcome));
981     }
982
983     /**
984      * Tests callbackStarted() when the pipeline has already been stopped.
985      */
986     @Test
987     public void testCallbackStartedNotRunning() {
988         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
989
990         /*
991          * arrange to stop the controller when the start-callback is invoked, but capture
992          * the outcome
993          */
994         params = params.toBuilder().startCallback(oper -> {
995             starter(oper);
996             future.get().cancel(false);
997         }).build();
998
999         // new params, thus need a new operation
1000         oper = new MyOper();
1001
1002         future.set(oper.start());
1003         assertTrue(executor.runAll(MAX_REQUESTS));
1004
1005         // should have only run once
1006         assertEquals(1, numStart);
1007     }
1008
1009     /**
1010      * Tests callbackCompleted() when the pipeline has already been stopped.
1011      */
1012     @Test
1013     public void testCallbackCompletedNotRunning() {
1014         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1015
1016         // arrange to stop the controller when the start-callback is invoked
1017         params = params.toBuilder().startCallback(oper -> {
1018             future.get().cancel(false);
1019         }).build();
1020
1021         // new params, thus need a new operation
1022         oper = new MyOper();
1023
1024         future.set(oper.start());
1025         assertTrue(executor.runAll(MAX_REQUESTS));
1026
1027         // should not have been set
1028         assertNull(opend);
1029         assertEquals(0, numEnd);
1030     }
1031
1032     @Test
1033     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1034         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1035
1036         OperationOutcome outcome;
1037
1038         outcome = new OperationOutcome();
1039         oper.setOutcome(outcome, timex);
1040         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1041         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1042
1043         outcome = new OperationOutcome();
1044         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1045         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1046         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1047     }
1048
1049     @Test
1050     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1051         OperationOutcome outcome;
1052
1053         outcome = new OperationOutcome();
1054         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1055         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1056         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1057
1058         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1059         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1060         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1061
1062         for (PolicyResult result : FAILURE_RESULTS) {
1063             outcome = new OperationOutcome();
1064             oper.setOutcome(outcome, result);
1065             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1066             assertEquals(result.toString(), result, outcome.getResult());
1067         }
1068     }
1069
1070     @Test
1071     public void testIsTimeout() {
1072         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1073
1074         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1075         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1076         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1077         assertFalse(oper.isTimeout(new CompletionException(null)));
1078         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1079
1080         assertTrue(oper.isTimeout(timex));
1081         assertTrue(oper.isTimeout(new CompletionException(timex)));
1082     }
1083
1084     @Test
1085     public void testLogMessage() {
1086         final String infraStr = SINK_INFRA.toString();
1087
1088         // log structured data
1089         appender.clearExtractions();
1090         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1091         List<String> output = appender.getExtracted();
1092         assertEquals(1, output.size());
1093
1094         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1095                         .contains("{\n  \"text\": \"my-text\"\n}");
1096
1097         // repeat with a response
1098         appender.clearExtractions();
1099         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1100         output = appender.getExtracted();
1101         assertEquals(1, output.size());
1102
1103         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1104                         .contains("{\n  \"text\": \"my-text\"\n}");
1105
1106         // log a plain string
1107         appender.clearExtractions();
1108         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1109         output = appender.getExtracted();
1110         assertEquals(1, output.size());
1111         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1112
1113         // log a null request
1114         appender.clearExtractions();
1115         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1116         output = appender.getExtracted();
1117         assertEquals(1, output.size());
1118
1119         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1120
1121         // generate exception from coder
1122         setOperCoderException();
1123
1124         appender.clearExtractions();
1125         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1126         output = appender.getExtracted();
1127         assertEquals(2, output.size());
1128         assertThat(output.get(0)).contains("cannot pretty-print request");
1129         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1130
1131         // repeat with a response
1132         appender.clearExtractions();
1133         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1134         output = appender.getExtracted();
1135         assertEquals(2, output.size());
1136         assertThat(output.get(0)).contains("cannot pretty-print response");
1137         assertThat(output.get(1)).contains(MY_SOURCE);
1138     }
1139
1140     @Test
1141     public void testGetRetry() {
1142         assertEquals(0, oper.getRetry(null));
1143         assertEquals(10, oper.getRetry(10));
1144     }
1145
1146     @Test
1147     public void testGetRetryWait() {
1148         // need an operator that doesn't override the retry time
1149         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1150         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1151     }
1152
1153     @Test
1154     public void testGetTimeOutMs() {
1155         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1156
1157         params = params.toBuilder().timeoutSec(null).build();
1158
1159         // new params, thus need a new operation
1160         oper = new MyOper();
1161
1162         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1163     }
1164
1165     private void starter(OperationOutcome oper) {
1166         ++numStart;
1167         tstart = oper.getStart();
1168         opstart = oper;
1169         starts.add(oper);
1170     }
1171
1172     private void completer(OperationOutcome oper) {
1173         ++numEnd;
1174         opend = oper;
1175         ends.add(oper);
1176     }
1177
1178     /**
1179      * Gets a function that does nothing.
1180      *
1181      * @param <T> type of input parameter expected by the function
1182      * @return a function that does nothing
1183      */
1184     private <T> Consumer<T> noop() {
1185         return unused -> {
1186         };
1187     }
1188
1189     private OperationOutcome makeSuccess() {
1190         OperationOutcome outcome = params.makeOutcome();
1191         outcome.setResult(PolicyResult.SUCCESS);
1192
1193         return outcome;
1194     }
1195
1196     private OperationOutcome makeFailure() {
1197         OperationOutcome outcome = params.makeOutcome();
1198         outcome.setResult(PolicyResult.FAILURE);
1199
1200         return outcome;
1201     }
1202
1203     /**
1204      * Verifies a run.
1205      *
1206      * @param testName test name
1207      * @param expectedCallbacks number of callbacks expected
1208      * @param expectedOperations number of operation invocations expected
1209      * @param expectedResult expected outcome
1210      */
1211     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1212                     PolicyResult expectedResult) {
1213
1214         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1215     }
1216
1217     /**
1218      * Verifies a run.
1219      *
1220      * @param testName test name
1221      * @param expectedCallbacks number of callbacks expected
1222      * @param expectedOperations number of operation invocations expected
1223      * @param expectedResult expected outcome
1224      * @param manipulator function to modify the future returned by
1225      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1226      *        in the executor are run
1227      */
1228     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1229                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1230
1231         tstart = null;
1232         opstart = null;
1233         opend = null;
1234         starts.clear();
1235         ends.clear();
1236
1237         CompletableFuture<OperationOutcome> future = oper.start();
1238
1239         manipulator.accept(future);
1240
1241         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1242
1243         assertEquals(testName, expectedCallbacks, numStart);
1244         assertEquals(testName, expectedCallbacks, numEnd);
1245
1246         if (expectedCallbacks > 0) {
1247             assertNotNull(testName, opstart);
1248             assertNotNull(testName, opend);
1249             assertEquals(testName, expectedResult, opend.getResult());
1250
1251             assertSame(testName, tstart, opstart.getStart());
1252             assertSame(testName, tstart, opend.getStart());
1253
1254             try {
1255                 assertTrue(future.isDone());
1256                 assertEquals(testName, opend, future.get());
1257
1258                 // "start" is never final
1259                 for (OperationOutcome outcome : starts) {
1260                     assertFalse(testName, outcome.isFinalOutcome());
1261                 }
1262
1263                 // only the last "complete" is final
1264                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1265
1266                 for (OperationOutcome outcome : ends) {
1267                     assertFalse(outcome.isFinalOutcome());
1268                 }
1269
1270             } catch (InterruptedException | ExecutionException e) {
1271                 throw new IllegalStateException(e);
1272             }
1273
1274             if (expectedOperations > 0) {
1275                 assertNotNull(testName, oper.getSubRequestId());
1276                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1277                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1278             }
1279         }
1280
1281         assertEquals(testName, expectedOperations, oper.getCount());
1282     }
1283
1284     /**
1285      * Creates a new {@link #oper} whose coder will throw an exception.
1286      */
1287     private void setOperCoderException() {
1288         oper = new MyOper() {
1289             @Override
1290             protected Coder getCoder() {
1291                 return new StandardCoder() {
1292                     @Override
1293                     public String encode(Object object, boolean pretty) throws CoderException {
1294                         throw new CoderException(EXPECTED_EXCEPTION);
1295                     }
1296                 };
1297             }
1298         };
1299     }
1300
1301
1302     @Getter
1303     public static class MyData {
1304         private String text = TEXT;
1305     }
1306
1307
1308     private class MyOper extends OperationPartial {
1309         @Getter
1310         private int count = 0;
1311
1312         @Setter
1313         private boolean genException;
1314         @Setter
1315         private int maxFailures = 0;
1316         @Setter
1317         private CompletableFuture<OperationOutcome> preProc;
1318
1319
1320         public MyOper() {
1321             super(OperationPartialTest.this.params, config, PROP_NAMES);
1322         }
1323
1324         @Override
1325         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1326             ++count;
1327             if (genException) {
1328                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1329             }
1330
1331             operation.setSubRequestId(String.valueOf(attempt));
1332
1333             if (count > maxFailures) {
1334                 operation.setResult(PolicyResult.SUCCESS);
1335             } else {
1336                 operation.setResult(PolicyResult.FAILURE);
1337             }
1338
1339             return operation;
1340         }
1341
1342         @Override
1343         protected long getRetryWaitMs() {
1344             /*
1345              * Sleep timers run in the background, but we want to control things via the
1346              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1347              */
1348             return 0L;
1349         }
1350
1351         @Override
1352         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1353             return (preProc != null ? preProc : super.startPreprocessorAsync());
1354         }
1355     }
1356 }