9bbc528ecede80b566c3047c1fd3168252d08241
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.Deque;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.UUID;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.CompletionException;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicReference;
54 import java.util.function.Consumer;
55 import java.util.function.Supplier;
56 import java.util.stream.Collectors;
57 import lombok.Getter;
58 import lombok.Setter;
59 import org.junit.AfterClass;
60 import org.junit.Before;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.MockitoAnnotations;
66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
68 import org.onap.policy.common.utils.coder.Coder;
69 import org.onap.policy.common.utils.coder.CoderException;
70 import org.onap.policy.common.utils.coder.StandardCoder;
71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
72 import org.onap.policy.common.utils.time.PseudoExecutor;
73 import org.onap.policy.controlloop.ControlLoopOperation;
74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
79 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
81 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
82 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
83 import org.onap.policy.controlloop.policy.PolicyResult;
84 import org.slf4j.LoggerFactory;
85
86 public class OperationPartialTest {
87     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
88     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
89     private static final int MAX_REQUESTS = 100;
90     private static final int MAX_PARALLEL = 10;
91     private static final String EXPECTED_EXCEPTION = "expected exception";
92     private static final String ACTOR = "my-actor";
93     private static final String OPERATION = "my-operation";
94     private static final String MY_SINK = "my-sink";
95     private static final String MY_SOURCE = "my-source";
96     private static final String MY_TARGET_ENTITY = "my-entity";
97     private static final String TEXT = "my-text";
98     private static final int TIMEOUT = 1000;
99     private static final UUID REQ_ID = UUID.randomUUID();
100
101     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
102                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
103
104     /**
105      * Used to attach an appender to the class' logger.
106      */
107     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
108     private static final ExtractAppender appender = new ExtractAppender();
109
110     private static final List<String> PROP_NAMES = List.of("hello", "world");
111
112     @Mock
113     private ActorService service;
114     @Mock
115     private Actor guardActor;
116     @Mock
117     private Operator guardOperator;
118     @Mock
119     private Operation guardOperation;
120
121     private VirtualControlLoopEvent event;
122     private ControlLoopEventContext context;
123     private PseudoExecutor executor;
124     private ControlLoopOperationParams params;
125
126     private MyOper oper;
127
128     private int numStart;
129     private int numEnd;
130
131     private Instant tstart;
132
133     private OperationOutcome opstart;
134     private OperationOutcome opend;
135
136     private Deque<OperationOutcome> starts;
137     private Deque<OperationOutcome> ends;
138
139     private OperatorConfig config;
140
141     /**
142      * Attaches the appender to the logger.
143      */
144     @BeforeClass
145     public static void setUpBeforeClass() throws Exception {
146         /*
147          * Attach appender to the logger.
148          */
149         appender.setContext(logger.getLoggerContext());
150         appender.start();
151
152         logger.addAppender(appender);
153     }
154
155     /**
156      * Stops the appender.
157      */
158     @AfterClass
159     public static void tearDownAfterClass() {
160         appender.stop();
161     }
162
163     /**
164      * Initializes the fields, including {@link #oper}.
165      */
166     @Before
167     public void setUp() {
168         MockitoAnnotations.initMocks(this);
169
170         event = new VirtualControlLoopEvent();
171         event.setRequestId(REQ_ID);
172
173         context = new ControlLoopEventContext(event);
174         executor = new PseudoExecutor();
175
176         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
177                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
178                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
179
180         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
181         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
182         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
183         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
184
185         config = new OperatorConfig(executor);
186
187         oper = new MyOper();
188
189         tstart = null;
190
191         opstart = null;
192         opend = null;
193
194         starts = new ArrayDeque<>(10);
195         ends = new ArrayDeque<>(10);
196     }
197
198     @Test
199     public void testOperatorPartial_testGetActorName_testGetName() {
200         assertEquals(ACTOR, oper.getActorName());
201         assertEquals(OPERATION, oper.getName());
202         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
203     }
204
205     @Test
206     public void testGetBlockingThread() throws Exception {
207         CompletableFuture<Void> future = new CompletableFuture<>();
208
209         // use the real executor
210         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
211             @Override
212             public Operation buildOperation(ControlLoopOperationParams params) {
213                 return null;
214             }
215         };
216
217         oper2.getBlockingExecutor().execute(() -> future.complete(null));
218
219         assertNull(future.get(5, TimeUnit.SECONDS));
220     }
221
222     @Test
223     public void testGetPropertyNames() {
224         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
225     }
226
227     @Test
228     public void testGetProperty_testSetProperty() {
229         oper.setProperty("propertyA", "valueA");
230         oper.setProperty("propertyB", "valueB");
231         oper.setProperty("propertyC", 20);
232
233         assertEquals("valueA", oper.getProperty("propertyA"));
234         assertEquals("valueB", oper.getProperty("propertyB"));
235         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
236     }
237
238     @Test
239     public void testStart() {
240         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
241     }
242
243     /**
244      * Tests start() with multiple running requests.
245      */
246     @Test
247     public void testStartMultiple() {
248         for (int count = 0; count < MAX_PARALLEL; ++count) {
249             oper.start();
250         }
251
252         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
253
254         assertNotNull(opstart);
255         assertNotNull(opend);
256         assertEquals(PolicyResult.SUCCESS, opend.getResult());
257
258         assertEquals(MAX_PARALLEL, numStart);
259         assertEquals(MAX_PARALLEL, oper.getCount());
260         assertEquals(MAX_PARALLEL, numEnd);
261     }
262
263     /**
264      * Tests startPreprocessor() when the preprocessor returns a failure.
265      */
266     @Test
267     public void testStartPreprocessorFailure() {
268         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
269
270         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
271     }
272
273     /**
274      * Tests startPreprocessor() when the preprocessor throws an exception.
275      */
276     @Test
277     public void testStartPreprocessorException() {
278         // arrange for the preprocessor to throw an exception
279         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
280
281         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
282     }
283
284     /**
285      * Tests startPreprocessor() when the pipeline is not running.
286      */
287     @Test
288     public void testStartPreprocessorNotRunning() {
289         // arrange for the preprocessor to return success, which will be ignored
290         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
291
292         oper.start().cancel(false);
293         assertTrue(executor.runAll(MAX_REQUESTS));
294
295         assertNull(opstart);
296         assertNull(opend);
297
298         assertEquals(0, numStart);
299         assertEquals(0, oper.getCount());
300         assertEquals(0, numEnd);
301     }
302
303     /**
304      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
305      */
306     @Test
307     public void testStartPreprocessorBuilderException() {
308         oper = new MyOper() {
309             @Override
310             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
311                 throw new IllegalStateException(EXPECTED_EXCEPTION);
312             }
313         };
314
315         assertThatIllegalStateException().isThrownBy(() -> oper.start());
316
317         // should be nothing in the queue
318         assertEquals(0, executor.getQueueLength());
319     }
320
321     @Test
322     public void testStartPreprocessorAsync() {
323         assertNull(oper.startPreprocessorAsync());
324     }
325
326     @Test
327     public void testStartGuardAsync() throws Exception {
328         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
329         assertTrue(future.isDone());
330         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
331
332         // verify the parameters that were passed
333         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
334                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
335         verify(guardOperator).buildOperation(paramsCaptor.capture());
336
337         params = paramsCaptor.getValue();
338         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
339         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
340         assertNull(params.getRetry());
341         assertNull(params.getTimeoutSec());
342
343         Map<String, Object> payload = params.getPayload();
344         assertNotNull(payload);
345
346         assertEquals(oper.makeGuardPayload(), payload);
347     }
348
349     @Test
350     public void testMakeGuardPayload() {
351         Map<String, Object> payload = oper.makeGuardPayload();
352         assertSame(REQ_ID, payload.get("requestId"));
353
354         // request id changes, so remove it
355         payload.remove("requestId");
356
357         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
358
359         // repeat, but with closed loop name
360         event.setClosedLoopControlName("my-loop");
361         payload = oper.makeGuardPayload();
362         payload.remove("requestId");
363         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
364     }
365
366     @Test
367     public void testStartOperationAsync() {
368         oper.start();
369         assertTrue(executor.runAll(MAX_REQUESTS));
370
371         assertEquals(1, oper.getCount());
372     }
373
374     @Test
375     public void testIsSuccess() {
376         assertFalse(oper.isSuccess(null));
377
378         OperationOutcome outcome = new OperationOutcome();
379
380         outcome.setResult(PolicyResult.SUCCESS);
381         assertTrue(oper.isSuccess(outcome));
382
383         for (PolicyResult failure : FAILURE_RESULTS) {
384             outcome.setResult(failure);
385             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
386         }
387     }
388
389     @Test
390     public void testIsActorFailed() {
391         assertFalse(oper.isActorFailed(null));
392
393         OperationOutcome outcome = params.makeOutcome();
394
395         // incorrect outcome
396         outcome.setResult(PolicyResult.SUCCESS);
397         assertFalse(oper.isActorFailed(outcome));
398
399         outcome.setResult(PolicyResult.FAILURE_RETRIES);
400         assertFalse(oper.isActorFailed(outcome));
401
402         // correct outcome
403         outcome.setResult(PolicyResult.FAILURE);
404
405         // incorrect actor
406         outcome.setActor(MY_SINK);
407         assertFalse(oper.isActorFailed(outcome));
408         outcome.setActor(null);
409         assertFalse(oper.isActorFailed(outcome));
410         outcome.setActor(ACTOR);
411
412         // incorrect operation
413         outcome.setOperation(MY_SINK);
414         assertFalse(oper.isActorFailed(outcome));
415         outcome.setOperation(null);
416         assertFalse(oper.isActorFailed(outcome));
417         outcome.setOperation(OPERATION);
418
419         // correct values
420         assertTrue(oper.isActorFailed(outcome));
421     }
422
423     @Test
424     public void testDoOperation() {
425         /*
426          * Use an operation that doesn't override doOperation().
427          */
428         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
429
430         oper2.start();
431         assertTrue(executor.runAll(MAX_REQUESTS));
432
433         assertNotNull(opend);
434         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
435     }
436
437     @Test
438     public void testTimeout() throws Exception {
439
440         // use a real executor
441         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
442
443         // trigger timeout very quickly
444         oper = new MyOper() {
445             @Override
446             protected long getTimeoutMs(Integer timeoutSec) {
447                 return 1;
448             }
449
450             @Override
451             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
452
453                 OperationOutcome outcome2 = params.makeOutcome();
454                 outcome2.setResult(PolicyResult.SUCCESS);
455
456                 /*
457                  * Create an incomplete future that will timeout after the operation's
458                  * timeout. If it fires before the other timer, then it will return a
459                  * SUCCESS outcome.
460                  */
461                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
462                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
463                                 params.getExecutor());
464
465                 return future;
466             }
467         };
468
469         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
470     }
471
472     /**
473      * Tests retry functions, when the count is set to zero and retries are exhausted.
474      */
475     @Test
476     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
477         params = params.toBuilder().retry(0).build();
478
479         // new params, thus need a new operation
480         oper = new MyOper();
481
482         oper.setMaxFailures(10);
483
484         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
485     }
486
487     /**
488      * Tests retry functions, when the count is null and retries are exhausted.
489      */
490     @Test
491     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
492         params = params.toBuilder().retry(null).build();
493
494         // new params, thus need a new operation
495         oper = new MyOper();
496
497         oper.setMaxFailures(10);
498
499         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
500     }
501
502     /**
503      * Tests retry functions, when retries are exhausted.
504      */
505     @Test
506     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
507         final int maxRetries = 3;
508         params = params.toBuilder().retry(maxRetries).build();
509
510         // new params, thus need a new operation
511         oper = new MyOper();
512
513         oper.setMaxFailures(10);
514
515         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
516                         PolicyResult.FAILURE_RETRIES);
517     }
518
519     /**
520      * Tests retry functions, when a success follows some retries.
521      */
522     @Test
523     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
524         params = params.toBuilder().retry(10).build();
525
526         // new params, thus need a new operation
527         oper = new MyOper();
528
529         final int maxFailures = 3;
530         oper.setMaxFailures(maxFailures);
531
532         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
533                         PolicyResult.SUCCESS);
534     }
535
536     /**
537      * Tests retry functions, when the outcome is {@code null}.
538      */
539     @Test
540     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
541
542         // arrange to return null from doOperation()
543         oper = new MyOper() {
544             @Override
545             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
546
547                 // update counters
548                 super.doOperation(attempt, outcome);
549                 return null;
550             }
551         };
552
553         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
554     }
555
556     @Test
557     public void testSleep() throws Exception {
558         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
559         assertTrue(future.isDone());
560         assertNull(future.get());
561
562         // edge case
563         future = oper.sleep(0, TimeUnit.SECONDS);
564         assertTrue(future.isDone());
565         assertNull(future.get());
566
567         /*
568          * Start a second sleep we can use to check the first while it's running.
569          */
570         tstart = Instant.now();
571         future = oper.sleep(100, TimeUnit.MILLISECONDS);
572
573         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
574
575         // wait for second to complete and verify that the first has not completed
576         future2.get();
577         assertFalse(future.isDone());
578
579         // wait for second to complete
580         future.get();
581
582         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
583         assertTrue(diff >= 99);
584     }
585
586     @Test
587     public void testIsSameOperation() {
588         assertFalse(oper.isSameOperation(null));
589
590         OperationOutcome outcome = params.makeOutcome();
591
592         // wrong actor - should be false
593         outcome.setActor(null);
594         assertFalse(oper.isSameOperation(outcome));
595         outcome.setActor(MY_SINK);
596         assertFalse(oper.isSameOperation(outcome));
597         outcome.setActor(ACTOR);
598
599         // wrong operation - should be null
600         outcome.setOperation(null);
601         assertFalse(oper.isSameOperation(outcome));
602         outcome.setOperation(MY_SINK);
603         assertFalse(oper.isSameOperation(outcome));
604         outcome.setOperation(OPERATION);
605
606         assertTrue(oper.isSameOperation(outcome));
607     }
608
609     /**
610      * Tests handleFailure() when the outcome is a success.
611      */
612     @Test
613     public void testHandlePreprocessorFailureSuccess() {
614         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
615         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
616     }
617
618     /**
619      * Tests handleFailure() when the outcome is <i>not</i> a success.
620      */
621     @Test
622     public void testHandlePreprocessorFailureFailed() throws Exception {
623         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
624         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
625     }
626
627     /**
628      * Tests handleFailure() when the outcome is {@code null}.
629      */
630     @Test
631     public void testHandlePreprocessorFailureNull() throws Exception {
632         // arrange to return a null outcome from the preprocessor
633         oper.setPreProc(CompletableFuture.completedFuture(null));
634         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
635     }
636
637     @Test
638     public void testFromException() {
639         // arrange to generate an exception when operation runs
640         oper.setGenException(true);
641
642         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
643     }
644
645     /**
646      * Tests fromException() when there is no exception.
647      */
648     @Test
649     public void testFromExceptionNoExcept() {
650         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
651     }
652
653     /**
654      * Tests both flavors of anyOf(), because one invokes the other.
655      */
656     @Test
657     public void testAnyOf() throws Exception {
658         // first task completes, others do not
659         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
660
661         final OperationOutcome outcome = params.makeOutcome();
662
663         tasks.add(() -> CompletableFuture.completedFuture(outcome));
664         tasks.add(() -> new CompletableFuture<>());
665         tasks.add(() -> null);
666         tasks.add(() -> new CompletableFuture<>());
667
668         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
669         assertTrue(executor.runAll(MAX_REQUESTS));
670         assertTrue(result.isDone());
671         assertSame(outcome, result.get());
672
673         // repeat using array form
674         @SuppressWarnings("unchecked")
675         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
676         result = oper.anyOf(tasks.toArray(taskArray));
677         assertTrue(executor.runAll(MAX_REQUESTS));
678         assertTrue(result.isDone());
679         assertSame(outcome, result.get());
680
681         // second task completes, others do not
682         tasks.clear();
683         tasks.add(() -> new CompletableFuture<>());
684         tasks.add(() -> CompletableFuture.completedFuture(outcome));
685         tasks.add(() -> new CompletableFuture<>());
686
687         result = oper.anyOf(tasks);
688         assertTrue(executor.runAll(MAX_REQUESTS));
689         assertTrue(result.isDone());
690         assertSame(outcome, result.get());
691
692         // third task completes, others do not
693         tasks.clear();
694         tasks.add(() -> new CompletableFuture<>());
695         tasks.add(() -> new CompletableFuture<>());
696         tasks.add(() -> CompletableFuture.completedFuture(outcome));
697
698         result = oper.anyOf(tasks);
699         assertTrue(executor.runAll(MAX_REQUESTS));
700         assertTrue(result.isDone());
701         assertSame(outcome, result.get());
702     }
703
704     /**
705      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
706      */
707     @Test
708     @SuppressWarnings("unchecked")
709     public void testAnyOfEdge() throws Exception {
710         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
711
712         // zero items: check both using a list and using an array
713         assertNull(oper.anyOf(tasks));
714         assertNull(oper.anyOf());
715
716         // one item: : check both using a list and using an array
717         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
718         tasks.add(() -> future1);
719
720         assertSame(future1, oper.anyOf(tasks));
721         assertSame(future1, oper.anyOf(() -> future1));
722     }
723
724     @Test
725     public void testAllOfArray() throws Exception {
726         final OperationOutcome outcome = params.makeOutcome();
727
728         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
729         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
730         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
731
732         @SuppressWarnings("unchecked")
733         CompletableFuture<OperationOutcome> result =
734                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
735
736         assertTrue(executor.runAll(MAX_REQUESTS));
737         assertFalse(result.isDone());
738         future1.complete(outcome);
739
740         // complete 3 before 2
741         assertTrue(executor.runAll(MAX_REQUESTS));
742         assertFalse(result.isDone());
743         future3.complete(outcome);
744
745         assertTrue(executor.runAll(MAX_REQUESTS));
746         assertFalse(result.isDone());
747         future2.complete(outcome);
748
749         // all of them are now done
750         assertTrue(executor.runAll(MAX_REQUESTS));
751         assertTrue(result.isDone());
752         assertSame(outcome, result.get());
753     }
754
755     @Test
756     public void testAllOfList() throws Exception {
757         final OperationOutcome outcome = params.makeOutcome();
758
759         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
760         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
761         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
762
763         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
764         tasks.add(() -> future1);
765         tasks.add(() -> future2);
766         tasks.add(() -> null);
767         tasks.add(() -> future3);
768
769         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
770
771         assertTrue(executor.runAll(MAX_REQUESTS));
772         assertFalse(result.isDone());
773         future1.complete(outcome);
774
775         // complete 3 before 2
776         assertTrue(executor.runAll(MAX_REQUESTS));
777         assertFalse(result.isDone());
778         future3.complete(outcome);
779
780         assertTrue(executor.runAll(MAX_REQUESTS));
781         assertFalse(result.isDone());
782         future2.complete(outcome);
783
784         // all of them are now done
785         assertTrue(executor.runAll(MAX_REQUESTS));
786         assertTrue(result.isDone());
787         assertSame(outcome, result.get());
788     }
789
790     /**
791      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
792      */
793     @Test
794     @SuppressWarnings("unchecked")
795     public void testAllOfEdge() throws Exception {
796         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
797
798         // zero items: check both using a list and using an array
799         assertNull(oper.allOf(tasks));
800         assertNull(oper.allOf());
801
802         // one item: : check both using a list and using an array
803         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
804         tasks.add(() -> future1);
805
806         assertSame(future1, oper.allOf(tasks));
807         assertSame(future1, oper.allOf(() -> future1));
808     }
809
810     @Test
811     public void testAttachFutures() throws Exception {
812         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
813
814         // third task throws an exception during construction
815         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
816         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
817         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
818         tasks.add(() -> future1);
819         tasks.add(() -> future2);
820         tasks.add(() -> {
821             throw new IllegalStateException(EXPECTED_EXCEPTION);
822         });
823         tasks.add(() -> future3);
824
825         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
826
827         // should have canceled the first two, but not the last
828         assertTrue(future1.isCancelled());
829         assertTrue(future2.isCancelled());
830         assertFalse(future3.isCancelled());
831     }
832
833     @Test
834     public void testCombineOutcomes() throws Exception {
835         // only one outcome
836         verifyOutcomes(0, PolicyResult.SUCCESS);
837         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
838
839         // maximum is in different positions
840         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
841         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
842         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
843
844         // null outcome - takes precedence over a success
845         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
846         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
847         tasks.add(() -> CompletableFuture.completedFuture(null));
848         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
849         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
850
851         assertTrue(executor.runAll(MAX_REQUESTS));
852         assertTrue(result.isDone());
853         assertNull(result.get());
854
855         // one throws an exception during execution
856         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
857
858         tasks.clear();
859         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
860         tasks.add(() -> CompletableFuture.failedFuture(except));
861         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
862         result = oper.allOf(tasks);
863
864         assertTrue(executor.runAll(MAX_REQUESTS));
865         assertTrue(result.isCompletedExceptionally());
866         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
867     }
868
869     /**
870      * Tests both flavors of sequence(), because one invokes the other.
871      */
872     @Test
873     public void testSequence() throws Exception {
874         final OperationOutcome outcome = params.makeOutcome();
875
876         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
877         tasks.add(() -> CompletableFuture.completedFuture(outcome));
878         tasks.add(() -> null);
879         tasks.add(() -> CompletableFuture.completedFuture(outcome));
880         tasks.add(() -> CompletableFuture.completedFuture(outcome));
881
882         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
883         assertTrue(executor.runAll(MAX_REQUESTS));
884         assertTrue(result.isDone());
885         assertSame(outcome, result.get());
886
887         // repeat using array form
888         @SuppressWarnings("unchecked")
889         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
890         result = oper.sequence(tasks.toArray(taskArray));
891         assertTrue(executor.runAll(MAX_REQUESTS));
892         assertTrue(result.isDone());
893         assertSame(outcome, result.get());
894
895         // second task fails, third should not run
896         OperationOutcome failure = params.makeOutcome();
897         failure.setResult(PolicyResult.FAILURE);
898         tasks.clear();
899         tasks.add(() -> CompletableFuture.completedFuture(outcome));
900         tasks.add(() -> CompletableFuture.completedFuture(failure));
901         tasks.add(() -> CompletableFuture.completedFuture(outcome));
902
903         result = oper.sequence(tasks);
904         assertTrue(executor.runAll(MAX_REQUESTS));
905         assertTrue(result.isDone());
906         assertSame(failure, result.get());
907     }
908
909     /**
910      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
911      */
912     @Test
913     @SuppressWarnings("unchecked")
914     public void testSequenceEdge() throws Exception {
915         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
916
917         // zero items: check both using a list and using an array
918         assertNull(oper.sequence(tasks));
919         assertNull(oper.sequence());
920
921         // one item: : check both using a list and using an array
922         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
923         tasks.add(() -> future1);
924
925         assertSame(future1, oper.sequence(tasks));
926         assertSame(future1, oper.sequence(() -> future1));
927     }
928
929     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
930         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
931
932         OperationOutcome expectedOutcome = null;
933
934         for (int count = 0; count < results.length; ++count) {
935             OperationOutcome outcome = params.makeOutcome();
936             outcome.setResult(results[count]);
937             tasks.add(() -> CompletableFuture.completedFuture(outcome));
938
939             if (count == expected) {
940                 expectedOutcome = outcome;
941             }
942         }
943
944         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
945
946         assertTrue(executor.runAll(MAX_REQUESTS));
947         assertTrue(result.isDone());
948         assertSame(expectedOutcome, result.get());
949     }
950
951     @Test
952     public void testDetmPriority() throws CoderException {
953         assertEquals(1, oper.detmPriority(null));
954
955         OperationOutcome outcome = params.makeOutcome();
956
957         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
958                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
959                         PolicyResult.FAILURE_EXCEPTION, 6);
960
961         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
962             outcome.setResult(ent.getKey());
963             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
964         }
965
966         /*
967          * Test null result. We can't actually set it to null, because the set() method
968          * won't allow it. Instead, we decode it from a structure.
969          */
970         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
971         assertEquals(1, oper.detmPriority(outcome));
972     }
973
974     /**
975      * Tests callbackStarted() when the pipeline has already been stopped.
976      */
977     @Test
978     public void testCallbackStartedNotRunning() {
979         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
980
981         /*
982          * arrange to stop the controller when the start-callback is invoked, but capture
983          * the outcome
984          */
985         params = params.toBuilder().startCallback(oper -> {
986             starter(oper);
987             future.get().cancel(false);
988         }).build();
989
990         // new params, thus need a new operation
991         oper = new MyOper();
992
993         future.set(oper.start());
994         assertTrue(executor.runAll(MAX_REQUESTS));
995
996         // should have only run once
997         assertEquals(1, numStart);
998     }
999
1000     /**
1001      * Tests callbackCompleted() when the pipeline has already been stopped.
1002      */
1003     @Test
1004     public void testCallbackCompletedNotRunning() {
1005         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1006
1007         // arrange to stop the controller when the start-callback is invoked
1008         params = params.toBuilder().startCallback(oper -> {
1009             future.get().cancel(false);
1010         }).build();
1011
1012         // new params, thus need a new operation
1013         oper = new MyOper();
1014
1015         future.set(oper.start());
1016         assertTrue(executor.runAll(MAX_REQUESTS));
1017
1018         // should not have been set
1019         assertNull(opend);
1020         assertEquals(0, numEnd);
1021     }
1022
1023     @Test
1024     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1025         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1026
1027         OperationOutcome outcome;
1028
1029         outcome = new OperationOutcome();
1030         oper.setOutcome(outcome, timex);
1031         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1032         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1033
1034         outcome = new OperationOutcome();
1035         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1036         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1037         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1038     }
1039
1040     @Test
1041     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1042         OperationOutcome outcome;
1043
1044         outcome = new OperationOutcome();
1045         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1046         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1047         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1048
1049         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1050         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1051         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1052
1053         for (PolicyResult result : FAILURE_RESULTS) {
1054             outcome = new OperationOutcome();
1055             oper.setOutcome(outcome, result);
1056             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1057             assertEquals(result.toString(), result, outcome.getResult());
1058         }
1059     }
1060
1061     @Test
1062     public void testIsTimeout() {
1063         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1064
1065         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1066         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1067         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1068         assertFalse(oper.isTimeout(new CompletionException(null)));
1069         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1070
1071         assertTrue(oper.isTimeout(timex));
1072         assertTrue(oper.isTimeout(new CompletionException(timex)));
1073     }
1074
1075     @Test
1076     public void testLogMessage() {
1077         final String infraStr = SINK_INFRA.toString();
1078
1079         // log structured data
1080         appender.clearExtractions();
1081         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1082         List<String> output = appender.getExtracted();
1083         assertEquals(1, output.size());
1084
1085         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1086                         .contains("{\n  \"text\": \"my-text\"\n}");
1087
1088         // repeat with a response
1089         appender.clearExtractions();
1090         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1091         output = appender.getExtracted();
1092         assertEquals(1, output.size());
1093
1094         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1095                         .contains("{\n  \"text\": \"my-text\"\n}");
1096
1097         // log a plain string
1098         appender.clearExtractions();
1099         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1100         output = appender.getExtracted();
1101         assertEquals(1, output.size());
1102         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1103
1104         // log a null request
1105         appender.clearExtractions();
1106         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1107         output = appender.getExtracted();
1108         assertEquals(1, output.size());
1109
1110         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1111
1112         // generate exception from coder
1113         setOperCoderException();
1114
1115         appender.clearExtractions();
1116         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1117         output = appender.getExtracted();
1118         assertEquals(2, output.size());
1119         assertThat(output.get(0)).contains("cannot pretty-print request");
1120         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1121
1122         // repeat with a response
1123         appender.clearExtractions();
1124         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1125         output = appender.getExtracted();
1126         assertEquals(2, output.size());
1127         assertThat(output.get(0)).contains("cannot pretty-print response");
1128         assertThat(output.get(1)).contains(MY_SOURCE);
1129     }
1130
1131     @Test
1132     public void testGetRetry() {
1133         assertEquals(0, oper.getRetry(null));
1134         assertEquals(10, oper.getRetry(10));
1135     }
1136
1137     @Test
1138     public void testGetRetryWait() {
1139         // need an operator that doesn't override the retry time
1140         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1141         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1142     }
1143
1144     @Test
1145     public void testGetTimeOutMs() {
1146         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1147
1148         params = params.toBuilder().timeoutSec(null).build();
1149
1150         // new params, thus need a new operation
1151         oper = new MyOper();
1152
1153         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1154     }
1155
1156     private void starter(OperationOutcome oper) {
1157         ++numStart;
1158         tstart = oper.getStart();
1159         opstart = oper;
1160         starts.add(oper);
1161     }
1162
1163     private void completer(OperationOutcome oper) {
1164         ++numEnd;
1165         opend = oper;
1166         ends.add(oper);
1167     }
1168
1169     /**
1170      * Gets a function that does nothing.
1171      *
1172      * @param <T> type of input parameter expected by the function
1173      * @return a function that does nothing
1174      */
1175     private <T> Consumer<T> noop() {
1176         return unused -> {
1177         };
1178     }
1179
1180     private OperationOutcome makeSuccess() {
1181         OperationOutcome outcome = params.makeOutcome();
1182         outcome.setResult(PolicyResult.SUCCESS);
1183
1184         return outcome;
1185     }
1186
1187     private OperationOutcome makeFailure() {
1188         OperationOutcome outcome = params.makeOutcome();
1189         outcome.setResult(PolicyResult.FAILURE);
1190
1191         return outcome;
1192     }
1193
1194     /**
1195      * Verifies a run.
1196      *
1197      * @param testName test name
1198      * @param expectedCallbacks number of callbacks expected
1199      * @param expectedOperations number of operation invocations expected
1200      * @param expectedResult expected outcome
1201      */
1202     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1203                     PolicyResult expectedResult) {
1204
1205         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1206     }
1207
1208     /**
1209      * Verifies a run.
1210      *
1211      * @param testName test name
1212      * @param expectedCallbacks number of callbacks expected
1213      * @param expectedOperations number of operation invocations expected
1214      * @param expectedResult expected outcome
1215      * @param manipulator function to modify the future returned by
1216      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1217      *        in the executor are run
1218      */
1219     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1220                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1221
1222         tstart = null;
1223         opstart = null;
1224         opend = null;
1225         starts.clear();
1226         ends.clear();
1227
1228         CompletableFuture<OperationOutcome> future = oper.start();
1229
1230         manipulator.accept(future);
1231
1232         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1233
1234         assertEquals(testName, expectedCallbacks, numStart);
1235         assertEquals(testName, expectedCallbacks, numEnd);
1236
1237         if (expectedCallbacks > 0) {
1238             assertNotNull(testName, opstart);
1239             assertNotNull(testName, opend);
1240             assertEquals(testName, expectedResult, opend.getResult());
1241
1242             assertSame(testName, tstart, opstart.getStart());
1243             assertSame(testName, tstart, opend.getStart());
1244
1245             try {
1246                 assertTrue(future.isDone());
1247                 assertEquals(testName, opend, future.get());
1248
1249                 // "start" is never final
1250                 for (OperationOutcome outcome : starts) {
1251                     assertFalse(testName, outcome.isFinalOutcome());
1252                 }
1253
1254                 // only the last "complete" is final
1255                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1256
1257                 for (OperationOutcome outcome : ends) {
1258                     assertFalse(outcome.isFinalOutcome());
1259                 }
1260
1261             } catch (InterruptedException | ExecutionException e) {
1262                 throw new IllegalStateException(e);
1263             }
1264
1265             if (expectedOperations > 0) {
1266                 assertNotNull(testName, oper.getSubRequestId());
1267                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1268                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1269             }
1270         }
1271
1272         assertEquals(testName, expectedOperations, oper.getCount());
1273     }
1274
1275     /**
1276      * Creates a new {@link #oper} whose coder will throw an exception.
1277      */
1278     private void setOperCoderException() {
1279         oper = new MyOper() {
1280             @Override
1281             protected Coder getCoder() {
1282                 return new StandardCoder() {
1283                     @Override
1284                     public String encode(Object object, boolean pretty) throws CoderException {
1285                         throw new CoderException(EXPECTED_EXCEPTION);
1286                     }
1287                 };
1288             }
1289         };
1290     }
1291
1292
1293     @Getter
1294     public static class MyData {
1295         private String text = TEXT;
1296     }
1297
1298
1299     private class MyOper extends OperationPartial {
1300         @Getter
1301         private int count = 0;
1302
1303         @Setter
1304         private boolean genException;
1305         @Setter
1306         private int maxFailures = 0;
1307         @Setter
1308         private CompletableFuture<OperationOutcome> preProc;
1309
1310
1311         public MyOper() {
1312             super(OperationPartialTest.this.params, config, PROP_NAMES);
1313         }
1314
1315         @Override
1316         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1317             ++count;
1318             if (genException) {
1319                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1320             }
1321
1322             operation.setSubRequestId(String.valueOf(attempt));
1323
1324             if (count > maxFailures) {
1325                 operation.setResult(PolicyResult.SUCCESS);
1326             } else {
1327                 operation.setResult(PolicyResult.FAILURE);
1328             }
1329
1330             return operation;
1331         }
1332
1333         @Override
1334         protected long getRetryWaitMs() {
1335             /*
1336              * Sleep timers run in the background, but we want to control things via the
1337              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1338              */
1339             return 0L;
1340         }
1341
1342         @Override
1343         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1344             return (preProc != null ? preProc : super.startPreprocessorAsync());
1345         }
1346     }
1347 }