Make targetEntity a property
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / test / java / org / onap / policy / controlloop / actorserviceprovider / impl / OperationPartialTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.Deque;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.UUID;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.CompletionException;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicReference;
54 import java.util.function.Consumer;
55 import java.util.function.Supplier;
56 import java.util.stream.Collectors;
57 import lombok.Getter;
58 import lombok.Setter;
59 import org.junit.AfterClass;
60 import org.junit.Before;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.MockitoAnnotations;
66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
68 import org.onap.policy.common.utils.coder.Coder;
69 import org.onap.policy.common.utils.coder.CoderException;
70 import org.onap.policy.common.utils.coder.StandardCoder;
71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
72 import org.onap.policy.common.utils.time.PseudoExecutor;
73 import org.onap.policy.controlloop.ControlLoopOperation;
74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
78 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
79 import org.onap.policy.controlloop.actorserviceprovider.Operator;
80 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
81 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
82 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
83 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
84 import org.onap.policy.controlloop.policy.PolicyResult;
85 import org.slf4j.LoggerFactory;
86
87 public class OperationPartialTest {
88     private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
89     private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
90     private static final int MAX_REQUESTS = 100;
91     private static final int MAX_PARALLEL = 10;
92     private static final String EXPECTED_EXCEPTION = "expected exception";
93     private static final String ACTOR = "my-actor";
94     private static final String OPERATION = "my-operation";
95     private static final String MY_SINK = "my-sink";
96     private static final String MY_SOURCE = "my-source";
97     private static final String MY_TARGET_ENTITY = "my-entity";
98     private static final String TEXT = "my-text";
99     private static final int TIMEOUT = 1000;
100     private static final UUID REQ_ID = UUID.randomUUID();
101
102     private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
103                     .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
104
105     /**
106      * Used to attach an appender to the class' logger.
107      */
108     private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
109     private static final ExtractAppender appender = new ExtractAppender();
110
111     private static final List<String> PROP_NAMES = List.of("hello", "world");
112
113     @Mock
114     private ActorService service;
115     @Mock
116     private Actor guardActor;
117     @Mock
118     private Operator guardOperator;
119     @Mock
120     private Operation guardOperation;
121
122     private VirtualControlLoopEvent event;
123     private ControlLoopEventContext context;
124     private PseudoExecutor executor;
125     private ControlLoopOperationParams params;
126
127     private MyOper oper;
128
129     private int numStart;
130     private int numEnd;
131
132     private Instant tstart;
133
134     private OperationOutcome opstart;
135     private OperationOutcome opend;
136
137     private Deque<OperationOutcome> starts;
138     private Deque<OperationOutcome> ends;
139
140     private OperatorConfig config;
141
142     /**
143      * Attaches the appender to the logger.
144      */
145     @BeforeClass
146     public static void setUpBeforeClass() throws Exception {
147         /*
148          * Attach appender to the logger.
149          */
150         appender.setContext(logger.getLoggerContext());
151         appender.start();
152
153         logger.addAppender(appender);
154     }
155
156     /**
157      * Stops the appender.
158      */
159     @AfterClass
160     public static void tearDownAfterClass() {
161         appender.stop();
162     }
163
164     /**
165      * Initializes the fields, including {@link #oper}.
166      */
167     @Before
168     public void setUp() {
169         MockitoAnnotations.initMocks(this);
170
171         event = new VirtualControlLoopEvent();
172         event.setRequestId(REQ_ID);
173
174         context = new ControlLoopEventContext(event);
175         executor = new PseudoExecutor();
176
177         params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
178                         .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
179                         .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
180
181         when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
182         when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
183         when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
184         when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
185
186         config = new OperatorConfig(executor);
187
188         oper = new MyOper();
189
190         tstart = null;
191
192         opstart = null;
193         opend = null;
194
195         starts = new ArrayDeque<>(10);
196         ends = new ArrayDeque<>(10);
197     }
198
199     @Test
200     public void testOperatorPartial_testGetActorName_testGetName() {
201         assertEquals(ACTOR, oper.getActorName());
202         assertEquals(OPERATION, oper.getName());
203         assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
204     }
205
206     @Test
207     public void testGetBlockingThread() throws Exception {
208         CompletableFuture<Void> future = new CompletableFuture<>();
209
210         // use the real executor
211         OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
212             @Override
213             public Operation buildOperation(ControlLoopOperationParams params) {
214                 return null;
215             }
216         };
217
218         oper2.getBlockingExecutor().execute(() -> future.complete(null));
219
220         assertNull(future.get(5, TimeUnit.SECONDS));
221     }
222
223     @Test
224     public void testGetPropertyNames() {
225         assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
226     }
227
228     @Test
229     public void testGetProperty_testSetProperty() {
230         oper.setProperty("propertyA", "valueA");
231         oper.setProperty("propertyB", "valueB");
232         oper.setProperty("propertyC", 20);
233
234         assertEquals("valueA", oper.getProperty("propertyA"));
235         assertEquals("valueB", oper.getProperty("propertyB"));
236         assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
237     }
238
239     @Test
240     public void testStart() {
241         verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
242     }
243
244     /**
245      * Tests start() with multiple running requests.
246      */
247     @Test
248     public void testStartMultiple() {
249         for (int count = 0; count < MAX_PARALLEL; ++count) {
250             oper.start();
251         }
252
253         assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
254
255         assertNotNull(opstart);
256         assertNotNull(opend);
257         assertEquals(PolicyResult.SUCCESS, opend.getResult());
258
259         assertEquals(MAX_PARALLEL, numStart);
260         assertEquals(MAX_PARALLEL, oper.getCount());
261         assertEquals(MAX_PARALLEL, numEnd);
262     }
263
264     /**
265      * Tests startPreprocessor() when the preprocessor returns a failure.
266      */
267     @Test
268     public void testStartPreprocessorFailure() {
269         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
270
271         verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
272     }
273
274     /**
275      * Tests startPreprocessor() when the preprocessor throws an exception.
276      */
277     @Test
278     public void testStartPreprocessorException() {
279         // arrange for the preprocessor to throw an exception
280         oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
281
282         verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
283     }
284
285     /**
286      * Tests startPreprocessor() when the pipeline is not running.
287      */
288     @Test
289     public void testStartPreprocessorNotRunning() {
290         // arrange for the preprocessor to return success, which will be ignored
291         // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
292
293         oper.start().cancel(false);
294         assertTrue(executor.runAll(MAX_REQUESTS));
295
296         assertNull(opstart);
297         assertNull(opend);
298
299         assertEquals(0, numStart);
300         assertEquals(0, oper.getCount());
301         assertEquals(0, numEnd);
302     }
303
304     /**
305      * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
306      */
307     @Test
308     public void testStartPreprocessorBuilderException() {
309         oper = new MyOper() {
310             @Override
311             protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
312                 throw new IllegalStateException(EXPECTED_EXCEPTION);
313             }
314         };
315
316         assertThatIllegalStateException().isThrownBy(() -> oper.start());
317
318         // should be nothing in the queue
319         assertEquals(0, executor.getQueueLength());
320     }
321
322     @Test
323     public void testStartPreprocessorAsync() {
324         assertNull(oper.startPreprocessorAsync());
325     }
326
327     @Test
328     public void testStartGuardAsync() throws Exception {
329         CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
330         assertTrue(future.isDone());
331         assertEquals(PolicyResult.SUCCESS, future.get().getResult());
332
333         // verify the parameters that were passed
334         ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
335                         ArgumentCaptor.forClass(ControlLoopOperationParams.class);
336         verify(guardOperator).buildOperation(paramsCaptor.capture());
337
338         params = paramsCaptor.getValue();
339         assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
340         assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
341         assertNull(params.getRetry());
342         assertNull(params.getTimeoutSec());
343
344         Map<String, Object> payload = params.getPayload();
345         assertNotNull(payload);
346
347         assertEquals(oper.makeGuardPayload(), payload);
348     }
349
350     /**
351      * Tests startGuardAsync() when preprocessing is disabled.
352      */
353     @Test
354     public void testStartGuardAsyncDisabled() {
355         params = params.toBuilder().preprocessed(true).build();
356         assertNull(new MyOper().startGuardAsync());
357     }
358
359     @Test
360     public void testMakeGuardPayload() {
361         Map<String, Object> payload = oper.makeGuardPayload();
362         assertSame(REQ_ID, payload.get("requestId"));
363
364         // request id changes, so remove it
365         payload.remove("requestId");
366
367         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
368
369         // repeat, but with closed loop name
370         event.setClosedLoopControlName("my-loop");
371         payload = oper.makeGuardPayload();
372         payload.remove("requestId");
373         assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
374     }
375
376     @Test
377     public void testStartOperationAsync() {
378         oper.start();
379         assertTrue(executor.runAll(MAX_REQUESTS));
380
381         assertEquals(1, oper.getCount());
382     }
383
384     @Test
385     public void testIsSuccess() {
386         assertFalse(oper.isSuccess(null));
387
388         OperationOutcome outcome = new OperationOutcome();
389
390         outcome.setResult(PolicyResult.SUCCESS);
391         assertTrue(oper.isSuccess(outcome));
392
393         for (PolicyResult failure : FAILURE_RESULTS) {
394             outcome.setResult(failure);
395             assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
396         }
397     }
398
399     @Test
400     public void testIsActorFailed() {
401         assertFalse(oper.isActorFailed(null));
402
403         OperationOutcome outcome = params.makeOutcome(null);
404
405         // incorrect outcome
406         outcome.setResult(PolicyResult.SUCCESS);
407         assertFalse(oper.isActorFailed(outcome));
408
409         outcome.setResult(PolicyResult.FAILURE_RETRIES);
410         assertFalse(oper.isActorFailed(outcome));
411
412         // correct outcome
413         outcome.setResult(PolicyResult.FAILURE);
414
415         // incorrect actor
416         outcome.setActor(MY_SINK);
417         assertFalse(oper.isActorFailed(outcome));
418         outcome.setActor(null);
419         assertFalse(oper.isActorFailed(outcome));
420         outcome.setActor(ACTOR);
421
422         // incorrect operation
423         outcome.setOperation(MY_SINK);
424         assertFalse(oper.isActorFailed(outcome));
425         outcome.setOperation(null);
426         assertFalse(oper.isActorFailed(outcome));
427         outcome.setOperation(OPERATION);
428
429         // correct values
430         assertTrue(oper.isActorFailed(outcome));
431     }
432
433     @Test
434     public void testDoOperation() {
435         /*
436          * Use an operation that doesn't override doOperation().
437          */
438         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
439
440         oper2.start();
441         assertTrue(executor.runAll(MAX_REQUESTS));
442
443         assertNotNull(opend);
444         assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
445     }
446
447     @Test
448     public void testTimeout() throws Exception {
449
450         // use a real executor
451         params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
452
453         // trigger timeout very quickly
454         oper = new MyOper() {
455             @Override
456             protected long getTimeoutMs(Integer timeoutSec) {
457                 return 1;
458             }
459
460             @Override
461             protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
462
463                 OperationOutcome outcome2 = params.makeOutcome(null);
464                 outcome2.setResult(PolicyResult.SUCCESS);
465
466                 /*
467                  * Create an incomplete future that will timeout after the operation's
468                  * timeout. If it fires before the other timer, then it will return a
469                  * SUCCESS outcome.
470                  */
471                 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
472                 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
473                                 params.getExecutor());
474
475                 return future;
476             }
477         };
478
479         assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
480     }
481
482     /**
483      * Tests retry functions, when the count is set to zero and retries are exhausted.
484      */
485     @Test
486     public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
487         params = params.toBuilder().retry(0).build();
488
489         // new params, thus need a new operation
490         oper = new MyOper();
491
492         oper.setMaxFailures(10);
493
494         verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
495     }
496
497     /**
498      * Tests retry functions, when the count is null and retries are exhausted.
499      */
500     @Test
501     public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
502         params = params.toBuilder().retry(null).build();
503
504         // new params, thus need a new operation
505         oper = new MyOper();
506
507         oper.setMaxFailures(10);
508
509         verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
510     }
511
512     /**
513      * Tests retry functions, when retries are exhausted.
514      */
515     @Test
516     public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
517         final int maxRetries = 3;
518         params = params.toBuilder().retry(maxRetries).build();
519
520         // new params, thus need a new operation
521         oper = new MyOper();
522
523         oper.setMaxFailures(10);
524
525         verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
526                         PolicyResult.FAILURE_RETRIES);
527     }
528
529     /**
530      * Tests retry functions, when a success follows some retries.
531      */
532     @Test
533     public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
534         params = params.toBuilder().retry(10).build();
535
536         // new params, thus need a new operation
537         oper = new MyOper();
538
539         final int maxFailures = 3;
540         oper.setMaxFailures(maxFailures);
541
542         verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
543                         PolicyResult.SUCCESS);
544     }
545
546     /**
547      * Tests retry functions, when the outcome is {@code null}.
548      */
549     @Test
550     public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
551
552         // arrange to return null from doOperation()
553         oper = new MyOper() {
554             @Override
555             protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
556
557                 // update counters
558                 super.doOperation(attempt, outcome);
559                 return null;
560             }
561         };
562
563         verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
564     }
565
566     @Test
567     public void testSleep() throws Exception {
568         CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
569         assertTrue(future.isDone());
570         assertNull(future.get());
571
572         // edge case
573         future = oper.sleep(0, TimeUnit.SECONDS);
574         assertTrue(future.isDone());
575         assertNull(future.get());
576
577         /*
578          * Start a second sleep we can use to check the first while it's running.
579          */
580         tstart = Instant.now();
581         future = oper.sleep(100, TimeUnit.MILLISECONDS);
582
583         CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
584
585         // wait for second to complete and verify that the first has not completed
586         future2.get();
587         assertFalse(future.isDone());
588
589         // wait for second to complete
590         future.get();
591
592         long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
593         assertTrue(diff >= 99);
594     }
595
596     @Test
597     public void testIsSameOperation() {
598         assertFalse(oper.isSameOperation(null));
599
600         OperationOutcome outcome = params.makeOutcome(null);
601
602         // wrong actor - should be false
603         outcome.setActor(null);
604         assertFalse(oper.isSameOperation(outcome));
605         outcome.setActor(MY_SINK);
606         assertFalse(oper.isSameOperation(outcome));
607         outcome.setActor(ACTOR);
608
609         // wrong operation - should be null
610         outcome.setOperation(null);
611         assertFalse(oper.isSameOperation(outcome));
612         outcome.setOperation(MY_SINK);
613         assertFalse(oper.isSameOperation(outcome));
614         outcome.setOperation(OPERATION);
615
616         assertTrue(oper.isSameOperation(outcome));
617     }
618
619     /**
620      * Tests handleFailure() when the outcome is a success.
621      */
622     @Test
623     public void testHandlePreprocessorFailureSuccess() {
624         oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
625         verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
626     }
627
628     /**
629      * Tests handleFailure() when the outcome is <i>not</i> a success.
630      */
631     @Test
632     public void testHandlePreprocessorFailureFailed() throws Exception {
633         oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
634         verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
635     }
636
637     /**
638      * Tests handleFailure() when the outcome is {@code null}.
639      */
640     @Test
641     public void testHandlePreprocessorFailureNull() throws Exception {
642         // arrange to return a null outcome from the preprocessor
643         oper.setPreProc(CompletableFuture.completedFuture(null));
644         verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
645     }
646
647     @Test
648     public void testFromException() {
649         // arrange to generate an exception when operation runs
650         oper.setGenException(true);
651
652         verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
653     }
654
655     /**
656      * Tests fromException() when there is no exception.
657      */
658     @Test
659     public void testFromExceptionNoExcept() {
660         verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
661     }
662
663     /**
664      * Tests both flavors of anyOf(), because one invokes the other.
665      */
666     @Test
667     public void testAnyOf() throws Exception {
668         // first task completes, others do not
669         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
670
671         final OperationOutcome outcome = params.makeOutcome(null);
672
673         tasks.add(() -> CompletableFuture.completedFuture(outcome));
674         tasks.add(() -> new CompletableFuture<>());
675         tasks.add(() -> null);
676         tasks.add(() -> new CompletableFuture<>());
677
678         CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
679         assertTrue(executor.runAll(MAX_REQUESTS));
680         assertTrue(result.isDone());
681         assertSame(outcome, result.get());
682
683         // repeat using array form
684         @SuppressWarnings("unchecked")
685         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
686         result = oper.anyOf(tasks.toArray(taskArray));
687         assertTrue(executor.runAll(MAX_REQUESTS));
688         assertTrue(result.isDone());
689         assertSame(outcome, result.get());
690
691         // second task completes, others do not
692         tasks.clear();
693         tasks.add(() -> new CompletableFuture<>());
694         tasks.add(() -> CompletableFuture.completedFuture(outcome));
695         tasks.add(() -> new CompletableFuture<>());
696
697         result = oper.anyOf(tasks);
698         assertTrue(executor.runAll(MAX_REQUESTS));
699         assertTrue(result.isDone());
700         assertSame(outcome, result.get());
701
702         // third task completes, others do not
703         tasks.clear();
704         tasks.add(() -> new CompletableFuture<>());
705         tasks.add(() -> new CompletableFuture<>());
706         tasks.add(() -> CompletableFuture.completedFuture(outcome));
707
708         result = oper.anyOf(tasks);
709         assertTrue(executor.runAll(MAX_REQUESTS));
710         assertTrue(result.isDone());
711         assertSame(outcome, result.get());
712     }
713
714     /**
715      * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
716      */
717     @Test
718     @SuppressWarnings("unchecked")
719     public void testAnyOfEdge() throws Exception {
720         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
721
722         // zero items: check both using a list and using an array
723         assertNull(oper.anyOf(tasks));
724         assertNull(oper.anyOf());
725
726         // one item: : check both using a list and using an array
727         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
728         tasks.add(() -> future1);
729
730         assertSame(future1, oper.anyOf(tasks));
731         assertSame(future1, oper.anyOf(() -> future1));
732     }
733
734     @Test
735     public void testAllOfArray() throws Exception {
736         final OperationOutcome outcome = params.makeOutcome(null);
737
738         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
739         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
740         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
741
742         @SuppressWarnings("unchecked")
743         CompletableFuture<OperationOutcome> result =
744                         oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
745
746         assertTrue(executor.runAll(MAX_REQUESTS));
747         assertFalse(result.isDone());
748         future1.complete(outcome);
749
750         // complete 3 before 2
751         assertTrue(executor.runAll(MAX_REQUESTS));
752         assertFalse(result.isDone());
753         future3.complete(outcome);
754
755         assertTrue(executor.runAll(MAX_REQUESTS));
756         assertFalse(result.isDone());
757         future2.complete(outcome);
758
759         // all of them are now done
760         assertTrue(executor.runAll(MAX_REQUESTS));
761         assertTrue(result.isDone());
762         assertSame(outcome, result.get());
763     }
764
765     @Test
766     public void testAllOfList() throws Exception {
767         final OperationOutcome outcome = params.makeOutcome(null);
768
769         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
770         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
771         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
772
773         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
774         tasks.add(() -> future1);
775         tasks.add(() -> future2);
776         tasks.add(() -> null);
777         tasks.add(() -> future3);
778
779         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
780
781         assertTrue(executor.runAll(MAX_REQUESTS));
782         assertFalse(result.isDone());
783         future1.complete(outcome);
784
785         // complete 3 before 2
786         assertTrue(executor.runAll(MAX_REQUESTS));
787         assertFalse(result.isDone());
788         future3.complete(outcome);
789
790         assertTrue(executor.runAll(MAX_REQUESTS));
791         assertFalse(result.isDone());
792         future2.complete(outcome);
793
794         // all of them are now done
795         assertTrue(executor.runAll(MAX_REQUESTS));
796         assertTrue(result.isDone());
797         assertSame(outcome, result.get());
798     }
799
800     /**
801      * Tests both flavors of allOf(), for edge cases: zero items, and one item.
802      */
803     @Test
804     @SuppressWarnings("unchecked")
805     public void testAllOfEdge() throws Exception {
806         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
807
808         // zero items: check both using a list and using an array
809         assertNull(oper.allOf(tasks));
810         assertNull(oper.allOf());
811
812         // one item: : check both using a list and using an array
813         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
814         tasks.add(() -> future1);
815
816         assertSame(future1, oper.allOf(tasks));
817         assertSame(future1, oper.allOf(() -> future1));
818     }
819
820     @Test
821     public void testAttachFutures() throws Exception {
822         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
823
824         // third task throws an exception during construction
825         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
826         CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
827         CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
828         tasks.add(() -> future1);
829         tasks.add(() -> future2);
830         tasks.add(() -> {
831             throw new IllegalStateException(EXPECTED_EXCEPTION);
832         });
833         tasks.add(() -> future3);
834
835         assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
836
837         // should have canceled the first two, but not the last
838         assertTrue(future1.isCancelled());
839         assertTrue(future2.isCancelled());
840         assertFalse(future3.isCancelled());
841     }
842
843     @Test
844     public void testCombineOutcomes() throws Exception {
845         // only one outcome
846         verifyOutcomes(0, PolicyResult.SUCCESS);
847         verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
848
849         // maximum is in different positions
850         verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
851         verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
852         verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
853
854         // null outcome - takes precedence over a success
855         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
856         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
857         tasks.add(() -> CompletableFuture.completedFuture(null));
858         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
859         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
860
861         assertTrue(executor.runAll(MAX_REQUESTS));
862         assertTrue(result.isDone());
863         assertNull(result.get());
864
865         // one throws an exception during execution
866         IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
867
868         tasks.clear();
869         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
870         tasks.add(() -> CompletableFuture.failedFuture(except));
871         tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
872         result = oper.allOf(tasks);
873
874         assertTrue(executor.runAll(MAX_REQUESTS));
875         assertTrue(result.isCompletedExceptionally());
876         result.whenComplete((unused, thrown) -> assertSame(except, thrown));
877     }
878
879     /**
880      * Tests both flavors of sequence(), because one invokes the other.
881      */
882     @Test
883     public void testSequence() throws Exception {
884         final OperationOutcome outcome = params.makeOutcome(null);
885
886         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
887         tasks.add(() -> CompletableFuture.completedFuture(outcome));
888         tasks.add(() -> null);
889         tasks.add(() -> CompletableFuture.completedFuture(outcome));
890         tasks.add(() -> CompletableFuture.completedFuture(outcome));
891
892         CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
893         assertTrue(executor.runAll(MAX_REQUESTS));
894         assertTrue(result.isDone());
895         assertSame(outcome, result.get());
896
897         // repeat using array form
898         @SuppressWarnings("unchecked")
899         Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
900         result = oper.sequence(tasks.toArray(taskArray));
901         assertTrue(executor.runAll(MAX_REQUESTS));
902         assertTrue(result.isDone());
903         assertSame(outcome, result.get());
904
905         // second task fails, third should not run
906         OperationOutcome failure = params.makeOutcome(null);
907         failure.setResult(PolicyResult.FAILURE);
908         tasks.clear();
909         tasks.add(() -> CompletableFuture.completedFuture(outcome));
910         tasks.add(() -> CompletableFuture.completedFuture(failure));
911         tasks.add(() -> CompletableFuture.completedFuture(outcome));
912
913         result = oper.sequence(tasks);
914         assertTrue(executor.runAll(MAX_REQUESTS));
915         assertTrue(result.isDone());
916         assertSame(failure, result.get());
917     }
918
919     /**
920      * Tests both flavors of sequence(), for edge cases: zero items, and one item.
921      */
922     @Test
923     @SuppressWarnings("unchecked")
924     public void testSequenceEdge() throws Exception {
925         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
926
927         // zero items: check both using a list and using an array
928         assertNull(oper.sequence(tasks));
929         assertNull(oper.sequence());
930
931         // one item: : check both using a list and using an array
932         CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
933         tasks.add(() -> future1);
934
935         assertSame(future1, oper.sequence(tasks));
936         assertSame(future1, oper.sequence(() -> future1));
937     }
938
939     private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
940         List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
941
942         OperationOutcome expectedOutcome = null;
943
944         for (int count = 0; count < results.length; ++count) {
945             OperationOutcome outcome = params.makeOutcome(null);
946             outcome.setResult(results[count]);
947             tasks.add(() -> CompletableFuture.completedFuture(outcome));
948
949             if (count == expected) {
950                 expectedOutcome = outcome;
951             }
952         }
953
954         CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
955
956         assertTrue(executor.runAll(MAX_REQUESTS));
957         assertTrue(result.isDone());
958         assertSame(expectedOutcome, result.get());
959     }
960
961     @Test
962     public void testDetmPriority() throws CoderException {
963         assertEquals(1, oper.detmPriority(null));
964
965         OperationOutcome outcome = params.makeOutcome(null);
966
967         Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
968                         PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
969                         PolicyResult.FAILURE_EXCEPTION, 6);
970
971         for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
972             outcome.setResult(ent.getKey());
973             assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
974         }
975
976         /*
977          * Test null result. We can't actually set it to null, because the set() method
978          * won't allow it. Instead, we decode it from a structure.
979          */
980         outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
981         assertEquals(1, oper.detmPriority(outcome));
982     }
983
984     /**
985      * Tests callbackStarted() when the pipeline has already been stopped.
986      */
987     @Test
988     public void testCallbackStartedNotRunning() {
989         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
990
991         /*
992          * arrange to stop the controller when the start-callback is invoked, but capture
993          * the outcome
994          */
995         params = params.toBuilder().startCallback(oper -> {
996             starter(oper);
997             future.get().cancel(false);
998         }).build();
999
1000         // new params, thus need a new operation
1001         oper = new MyOper();
1002
1003         future.set(oper.start());
1004         assertTrue(executor.runAll(MAX_REQUESTS));
1005
1006         // should have only run once
1007         assertEquals(1, numStart);
1008     }
1009
1010     /**
1011      * Tests callbackCompleted() when the pipeline has already been stopped.
1012      */
1013     @Test
1014     public void testCallbackCompletedNotRunning() {
1015         AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1016
1017         // arrange to stop the controller when the start-callback is invoked
1018         params = params.toBuilder().startCallback(oper -> {
1019             future.get().cancel(false);
1020         }).build();
1021
1022         // new params, thus need a new operation
1023         oper = new MyOper();
1024
1025         future.set(oper.start());
1026         assertTrue(executor.runAll(MAX_REQUESTS));
1027
1028         // should not have been set
1029         assertNull(opend);
1030         assertEquals(0, numEnd);
1031     }
1032
1033     @Test
1034     public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1035         final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1036
1037         OperationOutcome outcome;
1038
1039         outcome = new OperationOutcome();
1040         oper.setOutcome(outcome, timex);
1041         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1042         assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1043
1044         outcome = new OperationOutcome();
1045         oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1046         assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1047         assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1048     }
1049
1050     @Test
1051     public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1052         OperationOutcome outcome;
1053
1054         outcome = new OperationOutcome();
1055         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1056         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1057         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1058
1059         oper.setOutcome(outcome, PolicyResult.SUCCESS);
1060         assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1061         assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1062
1063         for (PolicyResult result : FAILURE_RESULTS) {
1064             outcome = new OperationOutcome();
1065             oper.setOutcome(outcome, result);
1066             assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1067             assertEquals(result.toString(), result, outcome.getResult());
1068         }
1069     }
1070
1071     @Test
1072     public void testIsTimeout() {
1073         final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1074
1075         assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1076         assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1077         assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1078         assertFalse(oper.isTimeout(new CompletionException(null)));
1079         assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1080
1081         assertTrue(oper.isTimeout(timex));
1082         assertTrue(oper.isTimeout(new CompletionException(timex)));
1083     }
1084
1085     @Test
1086     public void testLogMessage() {
1087         final String infraStr = SINK_INFRA.toString();
1088
1089         // log structured data
1090         appender.clearExtractions();
1091         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1092         List<String> output = appender.getExtracted();
1093         assertEquals(1, output.size());
1094
1095         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1096                         .contains("{\n  \"text\": \"my-text\"\n}");
1097
1098         // repeat with a response
1099         appender.clearExtractions();
1100         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1101         output = appender.getExtracted();
1102         assertEquals(1, output.size());
1103
1104         assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1105                         .contains("{\n  \"text\": \"my-text\"\n}");
1106
1107         // log a plain string
1108         appender.clearExtractions();
1109         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1110         output = appender.getExtracted();
1111         assertEquals(1, output.size());
1112         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1113
1114         // log a null request
1115         appender.clearExtractions();
1116         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1117         output = appender.getExtracted();
1118         assertEquals(1, output.size());
1119
1120         assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1121
1122         // generate exception from coder
1123         setOperCoderException();
1124
1125         appender.clearExtractions();
1126         oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1127         output = appender.getExtracted();
1128         assertEquals(2, output.size());
1129         assertThat(output.get(0)).contains("cannot pretty-print request");
1130         assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1131
1132         // repeat with a response
1133         appender.clearExtractions();
1134         oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1135         output = appender.getExtracted();
1136         assertEquals(2, output.size());
1137         assertThat(output.get(0)).contains("cannot pretty-print response");
1138         assertThat(output.get(1)).contains(MY_SOURCE);
1139     }
1140
1141     @Test
1142     public void testGetRetry() {
1143         assertEquals(0, oper.getRetry(null));
1144         assertEquals(10, oper.getRetry(10));
1145     }
1146
1147     @Test
1148     public void testGetRetryWait() {
1149         // need an operator that doesn't override the retry time
1150         OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1151         assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1152     }
1153
1154     @Test
1155     public void testGetTargetEntity() {
1156         // get it from the params
1157         assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
1158
1159         // now get it from the properties
1160         oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
1161         assertEquals("entityX", oper.getTargetEntity());
1162     }
1163
1164     @Test
1165     public void testGetTimeOutMs() {
1166         assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1167
1168         params = params.toBuilder().timeoutSec(null).build();
1169
1170         // new params, thus need a new operation
1171         oper = new MyOper();
1172
1173         assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1174     }
1175
1176     private void starter(OperationOutcome oper) {
1177         ++numStart;
1178         tstart = oper.getStart();
1179         opstart = oper;
1180         starts.add(oper);
1181     }
1182
1183     private void completer(OperationOutcome oper) {
1184         ++numEnd;
1185         opend = oper;
1186         ends.add(oper);
1187     }
1188
1189     /**
1190      * Gets a function that does nothing.
1191      *
1192      * @param <T> type of input parameter expected by the function
1193      * @return a function that does nothing
1194      */
1195     private <T> Consumer<T> noop() {
1196         return unused -> {
1197         };
1198     }
1199
1200     private OperationOutcome makeSuccess() {
1201         OperationOutcome outcome = params.makeOutcome(null);
1202         outcome.setResult(PolicyResult.SUCCESS);
1203
1204         return outcome;
1205     }
1206
1207     private OperationOutcome makeFailure() {
1208         OperationOutcome outcome = params.makeOutcome(null);
1209         outcome.setResult(PolicyResult.FAILURE);
1210
1211         return outcome;
1212     }
1213
1214     /**
1215      * Verifies a run.
1216      *
1217      * @param testName test name
1218      * @param expectedCallbacks number of callbacks expected
1219      * @param expectedOperations number of operation invocations expected
1220      * @param expectedResult expected outcome
1221      */
1222     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1223                     PolicyResult expectedResult) {
1224
1225         verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1226     }
1227
1228     /**
1229      * Verifies a run.
1230      *
1231      * @param testName test name
1232      * @param expectedCallbacks number of callbacks expected
1233      * @param expectedOperations number of operation invocations expected
1234      * @param expectedResult expected outcome
1235      * @param manipulator function to modify the future returned by
1236      *        {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1237      *        in the executor are run
1238      */
1239     private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1240                     Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1241
1242         tstart = null;
1243         opstart = null;
1244         opend = null;
1245         starts.clear();
1246         ends.clear();
1247
1248         CompletableFuture<OperationOutcome> future = oper.start();
1249
1250         manipulator.accept(future);
1251
1252         assertTrue(testName, executor.runAll(MAX_REQUESTS));
1253
1254         assertEquals(testName, expectedCallbacks, numStart);
1255         assertEquals(testName, expectedCallbacks, numEnd);
1256
1257         if (expectedCallbacks > 0) {
1258             assertNotNull(testName, opstart);
1259             assertNotNull(testName, opend);
1260             assertEquals(testName, expectedResult, opend.getResult());
1261
1262             assertSame(testName, tstart, opstart.getStart());
1263             assertSame(testName, tstart, opend.getStart());
1264
1265             try {
1266                 assertTrue(future.isDone());
1267                 assertEquals(testName, opend, future.get());
1268
1269                 // "start" is never final
1270                 for (OperationOutcome outcome : starts) {
1271                     assertFalse(testName, outcome.isFinalOutcome());
1272                 }
1273
1274                 // only the last "complete" is final
1275                 assertTrue(testName, ends.removeLast().isFinalOutcome());
1276
1277                 for (OperationOutcome outcome : ends) {
1278                     assertFalse(outcome.isFinalOutcome());
1279                 }
1280
1281             } catch (InterruptedException | ExecutionException e) {
1282                 throw new IllegalStateException(e);
1283             }
1284
1285             if (expectedOperations > 0) {
1286                 assertNotNull(testName, oper.getSubRequestId());
1287                 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1288                 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1289             }
1290         }
1291
1292         assertEquals(testName, expectedOperations, oper.getCount());
1293     }
1294
1295     /**
1296      * Creates a new {@link #oper} whose coder will throw an exception.
1297      */
1298     private void setOperCoderException() {
1299         oper = new MyOper() {
1300             @Override
1301             protected Coder getCoder() {
1302                 return new StandardCoder() {
1303                     @Override
1304                     public String encode(Object object, boolean pretty) throws CoderException {
1305                         throw new CoderException(EXPECTED_EXCEPTION);
1306                     }
1307                 };
1308             }
1309         };
1310     }
1311
1312
1313     @Getter
1314     public static class MyData {
1315         private String text = TEXT;
1316     }
1317
1318
1319     private class MyOper extends OperationPartial {
1320         @Getter
1321         private int count = 0;
1322
1323         @Setter
1324         private boolean genException;
1325         @Setter
1326         private int maxFailures = 0;
1327         @Setter
1328         private CompletableFuture<OperationOutcome> preProc;
1329
1330
1331         public MyOper() {
1332             super(OperationPartialTest.this.params, config, PROP_NAMES);
1333         }
1334
1335         @Override
1336         protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1337             ++count;
1338             if (genException) {
1339                 throw new IllegalStateException(EXPECTED_EXCEPTION);
1340             }
1341
1342             operation.setSubRequestId(String.valueOf(attempt));
1343
1344             if (count > maxFailures) {
1345                 operation.setResult(PolicyResult.SUCCESS);
1346             } else {
1347                 operation.setResult(PolicyResult.FAILURE);
1348             }
1349
1350             return operation;
1351         }
1352
1353         @Override
1354         protected long getRetryWaitMs() {
1355             /*
1356              * Sleep timers run in the background, but we want to control things via the
1357              * "executor", thus we avoid sleep timers altogether by simply returning 0.
1358              */
1359             return 0L;
1360         }
1361
1362         @Override
1363         protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1364             return (preProc != null ? preProc : super.startPreprocessorAsync());
1365         }
1366     }
1367 }