2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
38 import java.util.Map.Entry;
39 import java.util.Queue;
40 import java.util.UUID;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.CompletionException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.ForkJoinPool;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Function;
54 import java.util.stream.Collectors;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.onap.policy.common.utils.coder.CoderException;
60 import org.onap.policy.common.utils.coder.StandardCoder;
61 import org.onap.policy.controlloop.ControlLoopOperation;
62 import org.onap.policy.controlloop.VirtualControlLoopEvent;
63 import org.onap.policy.controlloop.actorserviceprovider.Operation;
64 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
65 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
66 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
67 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
68 import org.onap.policy.controlloop.policy.PolicyResult;
70 public class OperationPartialTest {
71 private static final int MAX_PARALLEL_REQUESTS = 10;
72 private static final String EXPECTED_EXCEPTION = "expected exception";
73 private static final String ACTOR = "my-actor";
74 private static final String OPERATION = "my-operation";
75 private static final String TARGET = "my-target";
76 private static final int TIMEOUT = 1000;
77 private static final UUID REQ_ID = UUID.randomUUID();
79 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
80 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
82 private VirtualControlLoopEvent event;
83 private ControlLoopEventContext context;
84 private MyExec executor;
85 private ControlLoopOperationParams params;
92 private Instant tstart;
94 private OperationOutcome opstart;
95 private OperationOutcome opend;
97 private OperatorPartial operator;
100 * Initializes the fields, including {@link #oper}.
103 public void setUp() {
104 event = new VirtualControlLoopEvent();
105 event.setRequestId(REQ_ID);
107 context = new ControlLoopEventContext(event);
108 executor = new MyExec();
110 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
111 .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
112 .startCallback(this::starter).targetEntity(TARGET).build();
114 operator = new OperatorPartial(ACTOR, OPERATION) {
116 public Executor getBlockingExecutor() {
121 public Operation buildOperation(ControlLoopOperationParams params) {
126 operator.configure(null);
138 public void testOperatorPartial_testGetActorName_testGetName() {
139 assertEquals(ACTOR, oper.getActorName());
140 assertEquals(OPERATION, oper.getName());
141 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
145 public void testGetBlockingThread() throws Exception {
146 CompletableFuture<Void> future = new CompletableFuture<>();
148 // use the real executor
149 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
151 public Operation buildOperation(ControlLoopOperationParams params) {
156 oper2.getBlockingExecutor().execute(() -> future.complete(null));
158 assertNull(future.get(5, TimeUnit.SECONDS));
162 * Exercises the doXxx() methods.
165 public void testDoXxx() {
166 assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
167 assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
168 assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
169 assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
174 public void testStart() {
175 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
179 * Tests startOperation() when the operator is not running.
182 public void testStartNotRunning() {
186 assertThatIllegalStateException().isThrownBy(() -> oper.start());
190 * Tests startOperation() when the operation has a preprocessor.
193 public void testStartWithPreprocessor() {
194 AtomicInteger count = new AtomicInteger();
196 CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
197 count.incrementAndGet();
198 return makeSuccess();
201 oper.setGuard(preproc);
203 verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
205 assertEquals(1, count.get());
209 * Tests start() with multiple running requests.
212 public void testStartMultiple() {
213 for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
217 assertTrue(executor.runAll());
219 assertNotNull(opstart);
220 assertNotNull(opend);
221 assertEquals(PolicyResult.SUCCESS, opend.getResult());
223 assertEquals(MAX_PARALLEL_REQUESTS, numStart);
224 assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
225 assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
229 * Tests startPreprocessor() when the preprocessor returns a failure.
232 public void testStartPreprocessorFailure() {
233 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
235 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
239 * Tests startPreprocessor() when the preprocessor throws an exception.
242 public void testStartPreprocessorException() {
243 // arrange for the preprocessor to throw an exception
244 oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
246 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
250 * Tests startPreprocessor() when the pipeline is not running.
253 public void testStartPreprocessorNotRunning() {
254 // arrange for the preprocessor to return success, which will be ignored
255 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
257 oper.start().cancel(false);
258 assertTrue(executor.runAll());
263 assertEquals(0, numStart);
264 assertEquals(0, oper.getCount());
265 assertEquals(0, numEnd);
269 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
272 public void testStartPreprocessorBuilderException() {
273 oper = new MyOper() {
275 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
276 throw new IllegalStateException(EXPECTED_EXCEPTION);
280 assertThatIllegalStateException().isThrownBy(() -> oper.start());
282 // should be nothing in the queue
283 assertEquals(0, executor.getQueueLength());
287 public void testStartPreprocessorAsync() {
288 assertNull(oper.startPreprocessorAsync());
292 public void testStartGuardAsync() {
293 assertNull(oper.startGuardAsync());
297 public void testStartOperationAsync() {
299 assertTrue(executor.runAll());
301 assertEquals(1, oper.getCount());
305 public void testIsSuccess() {
306 OperationOutcome outcome = new OperationOutcome();
308 outcome.setResult(PolicyResult.SUCCESS);
309 assertTrue(oper.isSuccess(outcome));
311 for (PolicyResult failure : FAILURE_RESULTS) {
312 outcome.setResult(failure);
313 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
318 public void testIsActorFailed() {
319 assertFalse(oper.isActorFailed(null));
321 OperationOutcome outcome = params.makeOutcome();
324 outcome.setResult(PolicyResult.SUCCESS);
325 assertFalse(oper.isActorFailed(outcome));
327 outcome.setResult(PolicyResult.FAILURE_RETRIES);
328 assertFalse(oper.isActorFailed(outcome));
331 outcome.setResult(PolicyResult.FAILURE);
334 outcome.setActor(TARGET);
335 assertFalse(oper.isActorFailed(outcome));
336 outcome.setActor(null);
337 assertFalse(oper.isActorFailed(outcome));
338 outcome.setActor(ACTOR);
340 // incorrect operation
341 outcome.setOperation(TARGET);
342 assertFalse(oper.isActorFailed(outcome));
343 outcome.setOperation(null);
344 assertFalse(oper.isActorFailed(outcome));
345 outcome.setOperation(OPERATION);
348 assertTrue(oper.isActorFailed(outcome));
352 public void testDoOperation() {
354 * Use an operation that doesn't override doOperation().
356 OperationPartial oper2 = new OperationPartial(params, operator) {};
359 assertTrue(executor.runAll());
361 assertNotNull(opend);
362 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
366 public void testTimeout() throws Exception {
368 // use a real executor
369 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
371 // trigger timeout very quickly
372 oper = new MyOper() {
374 protected long getTimeoutMs(Integer timeoutSec) {
379 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
381 OperationOutcome outcome2 = params.makeOutcome();
382 outcome2.setResult(PolicyResult.SUCCESS);
385 * Create an incomplete future that will timeout after the operation's
386 * timeout. If it fires before the other timer, then it will return a
389 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
390 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
391 params.getExecutor());
397 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
401 * Tests retry functions, when the count is set to zero and retries are exhausted.
404 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
405 params = params.toBuilder().retry(0).build();
407 // new params, thus need a new operation
410 oper.setMaxFailures(10);
412 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
416 * Tests retry functions, when the count is null and retries are exhausted.
419 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
420 params = params.toBuilder().retry(null).build();
422 // new params, thus need a new operation
425 oper.setMaxFailures(10);
427 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
431 * Tests retry functions, when retries are exhausted.
434 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
435 final int maxRetries = 3;
436 params = params.toBuilder().retry(maxRetries).build();
438 // new params, thus need a new operation
441 oper.setMaxFailures(10);
443 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
444 PolicyResult.FAILURE_RETRIES);
448 * Tests retry functions, when a success follows some retries.
451 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
452 params = params.toBuilder().retry(10).build();
454 // new params, thus need a new operation
457 final int maxFailures = 3;
458 oper.setMaxFailures(maxFailures);
460 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
461 PolicyResult.SUCCESS);
465 * Tests retry functions, when the outcome is {@code null}.
468 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
470 // arrange to return null from doOperation()
471 oper = new MyOper() {
473 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
476 super.doOperation(attempt, operation);
481 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
485 public void testSleep() throws Exception {
486 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
487 assertTrue(future.isDone());
488 assertNull(future.get());
491 future = oper.sleep(0, TimeUnit.SECONDS);
492 assertTrue(future.isDone());
493 assertNull(future.get());
496 * Start a second sleep we can use to check the first while it's running.
498 tstart = Instant.now();
499 future = oper.sleep(100, TimeUnit.MILLISECONDS);
501 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
503 // wait for second to complete and verify that the first has not completed
505 assertFalse(future.isDone());
507 // wait for second to complete
510 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
511 assertTrue(diff >= 99);
515 public void testIsSameOperation() {
516 assertFalse(oper.isSameOperation(null));
518 OperationOutcome outcome = params.makeOutcome();
520 // wrong actor - should be false
521 outcome.setActor(null);
522 assertFalse(oper.isSameOperation(outcome));
523 outcome.setActor(TARGET);
524 assertFalse(oper.isSameOperation(outcome));
525 outcome.setActor(ACTOR);
527 // wrong operation - should be null
528 outcome.setOperation(null);
529 assertFalse(oper.isSameOperation(outcome));
530 outcome.setOperation(TARGET);
531 assertFalse(oper.isSameOperation(outcome));
532 outcome.setOperation(OPERATION);
534 assertTrue(oper.isSameOperation(outcome));
538 * Tests handleFailure() when the outcome is a success.
541 public void testHandlePreprocessorFailureTrue() {
542 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
543 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
547 * Tests handleFailure() when the outcome is <i>not</i> a success.
550 public void testHandlePreprocessorFailureFalse() throws Exception {
551 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
552 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
556 * Tests handleFailure() when the outcome is {@code null}.
559 public void testHandlePreprocessorFailureNull() throws Exception {
560 // arrange to return null from the preprocessor
561 oper.setGuard(CompletableFuture.completedFuture(null));
563 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
567 public void testFromException() {
568 // arrange to generate an exception when operation runs
569 oper.setGenException(true);
571 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
575 * Tests fromException() when there is no exception.
578 public void testFromExceptionNoExcept() {
579 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
583 * Tests both flavors of anyOf(), because one invokes the other.
586 public void testAnyOf() throws Exception {
587 // first task completes, others do not
588 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
590 final OperationOutcome outcome = params.makeOutcome();
592 tasks.add(CompletableFuture.completedFuture(outcome));
593 tasks.add(new CompletableFuture<>());
594 tasks.add(new CompletableFuture<>());
596 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
597 assertTrue(executor.runAll());
599 assertTrue(result.isDone());
600 assertSame(outcome, result.get());
602 // second task completes, others do not
603 tasks = new LinkedList<>();
605 tasks.add(new CompletableFuture<>());
606 tasks.add(CompletableFuture.completedFuture(outcome));
607 tasks.add(new CompletableFuture<>());
609 result = oper.anyOf(tasks);
610 assertTrue(executor.runAll());
612 assertTrue(result.isDone());
613 assertSame(outcome, result.get());
615 // third task completes, others do not
616 tasks = new LinkedList<>();
618 tasks.add(new CompletableFuture<>());
619 tasks.add(new CompletableFuture<>());
620 tasks.add(CompletableFuture.completedFuture(outcome));
622 result = oper.anyOf(tasks);
623 assertTrue(executor.runAll());
625 assertTrue(result.isDone());
626 assertSame(outcome, result.get());
630 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
633 @SuppressWarnings("unchecked")
634 public void testAnyOfEdge() throws Exception {
635 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
637 // zero items: check both using a list and using an array
638 assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
639 assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
641 // one item: : check both using a list and using an array
642 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
645 assertSame(future1, oper.anyOf(tasks));
646 assertSame(future1, oper.anyOf(future1));
650 * Tests both flavors of allOf(), because one invokes the other.
653 public void testAllOf() throws Exception {
654 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
656 final OperationOutcome outcome = params.makeOutcome();
658 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
659 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
660 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
666 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
668 assertTrue(executor.runAll());
669 assertFalse(result.isDone());
670 future1.complete(outcome);
672 // complete 3 before 2
673 assertTrue(executor.runAll());
674 assertFalse(result.isDone());
675 future3.complete(outcome);
677 assertTrue(executor.runAll());
678 assertFalse(result.isDone());
679 future2.complete(outcome);
681 // all of them are now done
682 assertTrue(executor.runAll());
683 assertTrue(result.isDone());
684 assertSame(outcome, result.get());
688 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
691 @SuppressWarnings("unchecked")
692 public void testAllOfEdge() throws Exception {
693 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
695 // zero items: check both using a list and using an array
696 assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
697 assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
699 // one item: : check both using a list and using an array
700 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
703 assertSame(future1, oper.allOf(tasks));
704 assertSame(future1, oper.allOf(future1));
708 public void testCombineOutcomes() throws Exception {
710 verifyOutcomes(0, PolicyResult.SUCCESS);
711 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
713 // maximum is in different positions
714 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
715 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
716 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
719 final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
720 tasks.add(CompletableFuture.completedFuture(null));
721 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
723 assertTrue(executor.runAll());
724 assertTrue(result.isDone());
725 assertNull(result.get());
727 // one throws an exception during execution
728 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
731 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
732 tasks.add(CompletableFuture.failedFuture(except));
733 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
734 result = oper.allOf(tasks);
736 assertTrue(executor.runAll());
737 assertTrue(result.isCompletedExceptionally());
738 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
741 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
742 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
745 OperationOutcome expectedOutcome = null;
747 for (int count = 0; count < results.length; ++count) {
748 OperationOutcome outcome = params.makeOutcome();
749 outcome.setResult(results[count]);
750 tasks.add(CompletableFuture.completedFuture(outcome));
752 if (count == expected) {
753 expectedOutcome = outcome;
757 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
759 assertTrue(executor.runAll());
760 assertTrue(result.isDone());
761 assertSame(expectedOutcome, result.get());
764 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
765 final OperationOutcome taskOutcome) {
767 return outcome -> CompletableFuture.completedFuture(taskOutcome);
771 public void testDetmPriority() throws CoderException {
772 assertEquals(1, oper.detmPriority(null));
774 OperationOutcome outcome = params.makeOutcome();
776 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
777 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
778 PolicyResult.FAILURE_EXCEPTION, 6);
780 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
781 outcome.setResult(ent.getKey());
782 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
786 * Test null result. We can't actually set it to null, because the set() method
787 * won't allow it. Instead, we decode it from a structure.
789 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
790 assertEquals(1, oper.detmPriority(outcome));
794 * Tests doTask(Future) when the controller is not running.
797 public void testDoTaskFutureNotRunning() throws Exception {
798 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
800 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
801 controller.complete(params.makeOutcome());
803 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
804 assertFalse(future.isDone());
805 assertTrue(executor.runAll());
807 // should not have run the task
808 assertFalse(future.isDone());
810 // should have canceled the task future
811 assertTrue(taskFuture.isCancelled());
815 * Tests doTask(Future) when the previous outcome was successful.
818 public void testDoTaskFutureSuccess() throws Exception {
819 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
820 final OperationOutcome taskOutcome = params.makeOutcome();
822 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
824 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
826 taskFuture.complete(taskOutcome);
827 assertTrue(executor.runAll());
829 assertTrue(future.isDone());
830 assertSame(taskOutcome, future.get());
832 // controller should not be done yet
833 assertFalse(controller.isDone());
837 * Tests doTask(Future) when the previous outcome was failed.
840 public void testDoTaskFutureFailure() throws Exception {
841 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
842 final OperationOutcome failedOutcome = params.makeOutcome();
843 failedOutcome.setResult(PolicyResult.FAILURE);
845 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
847 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
848 assertFalse(future.isDone());
849 assertTrue(executor.runAll());
851 // should not have run the task
852 assertFalse(future.isDone());
854 // should have canceled the task future
855 assertTrue(taskFuture.isCancelled());
857 // controller SHOULD be done now
858 assertTrue(controller.isDone());
859 assertSame(failedOutcome, controller.get());
863 * Tests doTask(Future) when the previous outcome was failed, but not checking
867 public void testDoTaskFutureUncheckedFailure() throws Exception {
868 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
869 final OperationOutcome failedOutcome = params.makeOutcome();
870 failedOutcome.setResult(PolicyResult.FAILURE);
872 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
874 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
875 assertFalse(future.isDone());
878 OperationOutcome taskOutcome = params.makeOutcome();
879 taskFuture.complete(taskOutcome);
881 assertTrue(executor.runAll());
883 // should have run the task
884 assertTrue(future.isDone());
886 assertTrue(future.isDone());
887 assertSame(taskOutcome, future.get());
889 // controller should not be done yet
890 assertFalse(controller.isDone());
894 * Tests doTask(Function) when the controller is not running.
897 public void testDoTaskFunctionNotRunning() throws Exception {
898 AtomicBoolean invoked = new AtomicBoolean();
900 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
902 return CompletableFuture.completedFuture(params.makeOutcome());
905 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
906 controller.complete(params.makeOutcome());
908 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
909 assertFalse(future.isDone());
910 assertTrue(executor.runAll());
912 // should not have run the task
913 assertFalse(future.isDone());
915 // should not have even invoked the task
916 assertFalse(invoked.get());
920 * Tests doTask(Function) when the previous outcome was successful.
923 public void testDoTaskFunctionSuccess() throws Exception {
924 final OperationOutcome taskOutcome = params.makeOutcome();
926 final OperationOutcome failedOutcome = params.makeOutcome();
928 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
930 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
932 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
934 assertTrue(future.isDone());
935 assertSame(taskOutcome, future.get());
937 // controller should not be done yet
938 assertFalse(controller.isDone());
942 * Tests doTask(Function) when the previous outcome was failed.
945 public void testDoTaskFunctionFailure() throws Exception {
946 final OperationOutcome failedOutcome = params.makeOutcome();
947 failedOutcome.setResult(PolicyResult.FAILURE);
949 AtomicBoolean invoked = new AtomicBoolean();
951 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
953 return CompletableFuture.completedFuture(params.makeOutcome());
956 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
958 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
959 assertFalse(future.isDone());
960 assertTrue(executor.runAll());
962 // should not have run the task
963 assertFalse(future.isDone());
965 // should not have even invoked the task
966 assertFalse(invoked.get());
968 // controller should have the failed task
969 assertTrue(controller.isDone());
970 assertSame(failedOutcome, controller.get());
974 * Tests doTask(Function) when the previous outcome was failed, but not checking
978 public void testDoTaskFunctionUncheckedFailure() throws Exception {
979 final OperationOutcome taskOutcome = params.makeOutcome();
981 final OperationOutcome failedOutcome = params.makeOutcome();
982 failedOutcome.setResult(PolicyResult.FAILURE);
984 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
986 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
988 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
990 assertTrue(future.isDone());
991 assertSame(taskOutcome, future.get());
993 // controller should not be done yet
994 assertFalse(controller.isDone());
998 * Tests callbackStarted() when the pipeline has already been stopped.
1001 public void testCallbackStartedNotRunning() {
1002 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1005 * arrange to stop the controller when the start-callback is invoked, but capture
1008 params = params.toBuilder().startCallback(oper -> {
1010 future.get().cancel(false);
1013 // new params, thus need a new operation
1014 oper = new MyOper();
1016 future.set(oper.start());
1017 assertTrue(executor.runAll());
1019 // should have only run once
1020 assertEquals(1, numStart);
1024 * Tests callbackCompleted() when the pipeline has already been stopped.
1027 public void testCallbackCompletedNotRunning() {
1028 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1030 // arrange to stop the controller when the start-callback is invoked
1031 params = params.toBuilder().startCallback(oper -> {
1032 future.get().cancel(false);
1035 // new params, thus need a new operation
1036 oper = new MyOper();
1038 future.set(oper.start());
1039 assertTrue(executor.runAll());
1041 // should not have been set
1043 assertEquals(0, numEnd);
1047 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1048 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1050 OperationOutcome outcome;
1052 outcome = new OperationOutcome();
1053 oper.setOutcome(outcome, timex);
1054 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1055 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1057 outcome = new OperationOutcome();
1058 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1059 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1060 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1064 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1065 OperationOutcome outcome;
1067 outcome = new OperationOutcome();
1068 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1069 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1070 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1072 for (PolicyResult result : FAILURE_RESULTS) {
1073 outcome = new OperationOutcome();
1074 oper.setOutcome(outcome, result);
1075 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1076 assertEquals(result.toString(), result, outcome.getResult());
1081 public void testIsTimeout() {
1082 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1084 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1085 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1086 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1087 assertFalse(oper.isTimeout(new CompletionException(null)));
1088 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1090 assertTrue(oper.isTimeout(timex));
1091 assertTrue(oper.isTimeout(new CompletionException(timex)));
1095 public void testGetRetry() {
1096 assertEquals(0, oper.getRetry(null));
1097 assertEquals(10, oper.getRetry(10));
1101 public void testGetRetryWait() {
1102 // need an operator that doesn't override the retry time
1103 OperationPartial oper2 = new OperationPartial(params, operator) {};
1104 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1108 public void testGetTimeOutMs() {
1109 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1111 params = params.toBuilder().timeoutSec(null).build();
1113 // new params, thus need a new operation
1114 oper = new MyOper();
1116 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1119 private void starter(OperationOutcome oper) {
1121 tstart = oper.getStart();
1125 private void completer(OperationOutcome oper) {
1131 * Gets a function that does nothing.
1133 * @param <T> type of input parameter expected by the function
1134 * @return a function that does nothing
1136 private <T> Consumer<T> noop() {
1141 private OperationOutcome makeSuccess() {
1142 OperationOutcome outcome = params.makeOutcome();
1143 outcome.setResult(PolicyResult.SUCCESS);
1148 private OperationOutcome makeFailure() {
1149 OperationOutcome outcome = params.makeOutcome();
1150 outcome.setResult(PolicyResult.FAILURE);
1158 * @param testName test name
1159 * @param expectedCallbacks number of callbacks expected
1160 * @param expectedOperations number of operation invocations expected
1161 * @param expectedResult expected outcome
1163 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1164 PolicyResult expectedResult) {
1166 String expectedSubRequestId =
1167 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1169 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1175 * @param testName test name
1176 * @param expectedCallbacks number of callbacks expected
1177 * @param expectedOperations number of operation invocations expected
1178 * @param expectedResult expected outcome
1179 * @param expectedSubRequestId expected sub request ID
1180 * @param manipulator function to modify the future returned by
1181 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1182 * in the executor are run
1184 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1185 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1187 CompletableFuture<OperationOutcome> future = oper.start();
1189 manipulator.accept(future);
1191 assertTrue(testName, executor.runAll());
1193 assertEquals(testName, expectedCallbacks, numStart);
1194 assertEquals(testName, expectedCallbacks, numEnd);
1196 if (expectedCallbacks > 0) {
1197 assertNotNull(testName, opstart);
1198 assertNotNull(testName, opend);
1199 assertEquals(testName, expectedResult, opend.getResult());
1201 assertSame(testName, tstart, opstart.getStart());
1202 assertSame(testName, tstart, opend.getStart());
1205 assertTrue(future.isDone());
1206 assertSame(testName, opend, future.get());
1208 } catch (InterruptedException | ExecutionException e) {
1209 throw new IllegalStateException(e);
1212 if (expectedOperations > 0) {
1213 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1217 assertEquals(testName, expectedOperations, oper.getCount());
1220 private class MyOper extends OperationPartial {
1222 private int count = 0;
1225 private boolean genException;
1228 private int maxFailures = 0;
1231 private CompletableFuture<OperationOutcome> guard;
1235 super(OperationPartialTest.this.params, operator);
1239 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1242 throw new IllegalStateException(EXPECTED_EXCEPTION);
1245 operation.setSubRequestId(String.valueOf(attempt));
1247 if (count > maxFailures) {
1248 operation.setResult(PolicyResult.SUCCESS);
1250 operation.setResult(PolicyResult.FAILURE);
1257 protected CompletableFuture<OperationOutcome> startGuardAsync() {
1258 return (guard != null ? guard : super.startGuardAsync());
1262 protected long getRetryWaitMs() {
1264 * Sleep timers run in the background, but we want to control things via the
1265 * "executor", thus we avoid sleep timers altogether by simply returning 0.
1272 * Executor that will run tasks until the queue is empty or a maximum number of tasks
1273 * have been executed. Doesn't actually run anything until {@link #runAll()} is
1276 private static class MyExec implements Executor {
1277 private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
1279 private Queue<Runnable> commands = new LinkedList<>();
1285 public int getQueueLength() {
1286 return commands.size();
1290 public void execute(Runnable command) {
1291 commands.add(command);
1294 public boolean runAll() {
1295 for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
1296 commands.remove().run();
1299 return commands.isEmpty();