2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
38 import java.util.Map.Entry;
39 import java.util.UUID;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.CompletionException;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.Executor;
44 import java.util.concurrent.ForkJoinPool;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.concurrent.atomic.AtomicBoolean;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Function;
53 import java.util.stream.Collectors;
56 import org.junit.Before;
57 import org.junit.Test;
58 import org.onap.policy.common.utils.coder.CoderException;
59 import org.onap.policy.common.utils.coder.StandardCoder;
60 import org.onap.policy.controlloop.ControlLoopOperation;
61 import org.onap.policy.controlloop.VirtualControlLoopEvent;
62 import org.onap.policy.controlloop.actorserviceprovider.Operation;
63 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
64 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
65 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
66 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
67 import org.onap.policy.controlloop.policy.PolicyResult;
69 public class OperationPartialTest {
70 private static final int MAX_PARALLEL_REQUESTS = 10;
71 private static final String EXPECTED_EXCEPTION = "expected exception";
72 private static final String ACTOR = "my-actor";
73 private static final String OPERATION = "my-operation";
74 private static final String TARGET = "my-target";
75 private static final int TIMEOUT = 1000;
76 private static final UUID REQ_ID = UUID.randomUUID();
78 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
79 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
81 private VirtualControlLoopEvent event;
82 private ControlLoopEventContext context;
83 private MyExec executor;
84 private ControlLoopOperationParams params;
91 private Instant tstart;
93 private OperationOutcome opstart;
94 private OperationOutcome opend;
96 private OperatorPartial operator;
99 * Initializes the fields, including {@link #oper}.
102 public void setUp() {
103 event = new VirtualControlLoopEvent();
104 event.setRequestId(REQ_ID);
106 context = new ControlLoopEventContext(event);
107 executor = new MyExec(100 * MAX_PARALLEL_REQUESTS);
109 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
110 .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
111 .startCallback(this::starter).targetEntity(TARGET).build();
113 operator = new OperatorPartial(ACTOR, OPERATION) {
115 public Executor getBlockingExecutor() {
120 public Operation buildOperation(ControlLoopOperationParams params) {
125 operator.configure(null);
137 public void testOperatorPartial_testGetActorName_testGetName() {
138 assertEquals(ACTOR, oper.getActorName());
139 assertEquals(OPERATION, oper.getName());
140 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
144 public void testGetBlockingThread() throws Exception {
145 CompletableFuture<Void> future = new CompletableFuture<>();
147 // use the real executor
148 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
150 public Operation buildOperation(ControlLoopOperationParams params) {
155 oper2.getBlockingExecutor().execute(() -> future.complete(null));
157 assertNull(future.get(5, TimeUnit.SECONDS));
161 * Exercises the doXxx() methods.
164 public void testDoXxx() {
165 assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
166 assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
167 assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
168 assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
173 public void testStart() {
174 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
178 * Tests startOperation() when the operator is not running.
181 public void testStartNotRunning() {
185 assertThatIllegalStateException().isThrownBy(() -> oper.start());
189 * Tests startOperation() when the operation has a preprocessor.
192 public void testStartWithPreprocessor() {
193 AtomicInteger count = new AtomicInteger();
195 CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
196 count.incrementAndGet();
197 return makeSuccess();
200 oper.setGuard(preproc);
202 verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
204 assertEquals(1, count.get());
208 * Tests start() with multiple running requests.
211 public void testStartMultiple() {
212 for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
216 assertTrue(executor.runAll());
218 assertNotNull(opstart);
219 assertNotNull(opend);
220 assertEquals(PolicyResult.SUCCESS, opend.getResult());
222 assertEquals(MAX_PARALLEL_REQUESTS, numStart);
223 assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
224 assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
228 * Tests startPreprocessor() when the preprocessor returns a failure.
231 public void testStartPreprocessorFailure() {
232 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
234 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
238 * Tests startPreprocessor() when the preprocessor throws an exception.
241 public void testStartPreprocessorException() {
242 // arrange for the preprocessor to throw an exception
243 oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
245 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
249 * Tests startPreprocessor() when the pipeline is not running.
252 public void testStartPreprocessorNotRunning() {
253 // arrange for the preprocessor to return success, which will be ignored
254 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
256 oper.start().cancel(false);
257 assertTrue(executor.runAll());
262 assertEquals(0, numStart);
263 assertEquals(0, oper.getCount());
264 assertEquals(0, numEnd);
268 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
271 public void testStartPreprocessorBuilderException() {
272 oper = new MyOper() {
274 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
275 throw new IllegalStateException(EXPECTED_EXCEPTION);
279 assertThatIllegalStateException().isThrownBy(() -> oper.start());
281 // should be nothing in the queue
282 assertEquals(0, executor.getQueueLength());
286 public void testStartPreprocessorAsync() {
287 assertNull(oper.startPreprocessorAsync());
291 public void testStartGuardAsync() {
292 assertNull(oper.startGuardAsync());
296 public void testStartOperationAsync() {
298 assertTrue(executor.runAll());
300 assertEquals(1, oper.getCount());
304 public void testIsSuccess() {
305 OperationOutcome outcome = new OperationOutcome();
307 outcome.setResult(PolicyResult.SUCCESS);
308 assertTrue(oper.isSuccess(outcome));
310 for (PolicyResult failure : FAILURE_RESULTS) {
311 outcome.setResult(failure);
312 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
317 public void testIsActorFailed() {
318 assertFalse(oper.isActorFailed(null));
320 OperationOutcome outcome = params.makeOutcome();
323 outcome.setResult(PolicyResult.SUCCESS);
324 assertFalse(oper.isActorFailed(outcome));
326 outcome.setResult(PolicyResult.FAILURE_RETRIES);
327 assertFalse(oper.isActorFailed(outcome));
330 outcome.setResult(PolicyResult.FAILURE);
333 outcome.setActor(TARGET);
334 assertFalse(oper.isActorFailed(outcome));
335 outcome.setActor(null);
336 assertFalse(oper.isActorFailed(outcome));
337 outcome.setActor(ACTOR);
339 // incorrect operation
340 outcome.setOperation(TARGET);
341 assertFalse(oper.isActorFailed(outcome));
342 outcome.setOperation(null);
343 assertFalse(oper.isActorFailed(outcome));
344 outcome.setOperation(OPERATION);
347 assertTrue(oper.isActorFailed(outcome));
351 public void testDoOperation() {
353 * Use an operation that doesn't override doOperation().
355 OperationPartial oper2 = new OperationPartial(params, operator) {};
358 assertTrue(executor.runAll());
360 assertNotNull(opend);
361 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
365 public void testTimeout() throws Exception {
367 // use a real executor
368 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
370 // trigger timeout very quickly
371 oper = new MyOper() {
373 protected long getTimeoutMs(Integer timeoutSec) {
378 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
380 OperationOutcome outcome2 = params.makeOutcome();
381 outcome2.setResult(PolicyResult.SUCCESS);
384 * Create an incomplete future that will timeout after the operation's
385 * timeout. If it fires before the other timer, then it will return a
388 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
389 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
390 params.getExecutor());
396 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
400 * Tests retry functions, when the count is set to zero and retries are exhausted.
403 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
404 params = params.toBuilder().retry(0).build();
406 // new params, thus need a new operation
409 oper.setMaxFailures(10);
411 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
415 * Tests retry functions, when the count is null and retries are exhausted.
418 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
419 params = params.toBuilder().retry(null).build();
421 // new params, thus need a new operation
424 oper.setMaxFailures(10);
426 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
430 * Tests retry functions, when retries are exhausted.
433 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
434 final int maxRetries = 3;
435 params = params.toBuilder().retry(maxRetries).build();
437 // new params, thus need a new operation
440 oper.setMaxFailures(10);
442 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
443 PolicyResult.FAILURE_RETRIES);
447 * Tests retry functions, when a success follows some retries.
450 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
451 params = params.toBuilder().retry(10).build();
453 // new params, thus need a new operation
456 final int maxFailures = 3;
457 oper.setMaxFailures(maxFailures);
459 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
460 PolicyResult.SUCCESS);
464 * Tests retry functions, when the outcome is {@code null}.
467 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
469 // arrange to return null from doOperation()
470 oper = new MyOper() {
472 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
475 super.doOperation(attempt, operation);
480 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
484 public void testSleep() throws Exception {
485 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
486 assertTrue(future.isDone());
487 assertNull(future.get());
490 future = oper.sleep(0, TimeUnit.SECONDS);
491 assertTrue(future.isDone());
492 assertNull(future.get());
495 * Start a second sleep we can use to check the first while it's running.
497 tstart = Instant.now();
498 future = oper.sleep(100, TimeUnit.MILLISECONDS);
500 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
502 // wait for second to complete and verify that the first has not completed
504 assertFalse(future.isDone());
506 // wait for second to complete
509 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
510 assertTrue(diff >= 99);
514 public void testIsSameOperation() {
515 assertFalse(oper.isSameOperation(null));
517 OperationOutcome outcome = params.makeOutcome();
519 // wrong actor - should be false
520 outcome.setActor(null);
521 assertFalse(oper.isSameOperation(outcome));
522 outcome.setActor(TARGET);
523 assertFalse(oper.isSameOperation(outcome));
524 outcome.setActor(ACTOR);
526 // wrong operation - should be null
527 outcome.setOperation(null);
528 assertFalse(oper.isSameOperation(outcome));
529 outcome.setOperation(TARGET);
530 assertFalse(oper.isSameOperation(outcome));
531 outcome.setOperation(OPERATION);
533 assertTrue(oper.isSameOperation(outcome));
537 * Tests handleFailure() when the outcome is a success.
540 public void testHandlePreprocessorFailureTrue() {
541 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
542 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
546 * Tests handleFailure() when the outcome is <i>not</i> a success.
549 public void testHandlePreprocessorFailureFalse() throws Exception {
550 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
551 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
555 * Tests handleFailure() when the outcome is {@code null}.
558 public void testHandlePreprocessorFailureNull() throws Exception {
559 // arrange to return null from the preprocessor
560 oper.setGuard(CompletableFuture.completedFuture(null));
562 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
566 public void testFromException() {
567 // arrange to generate an exception when operation runs
568 oper.setGenException(true);
570 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
574 * Tests fromException() when there is no exception.
577 public void testFromExceptionNoExcept() {
578 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
582 * Tests both flavors of anyOf(), because one invokes the other.
585 public void testAnyOf() throws Exception {
586 // first task completes, others do not
587 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
589 final OperationOutcome outcome = params.makeOutcome();
591 tasks.add(CompletableFuture.completedFuture(outcome));
592 tasks.add(new CompletableFuture<>());
593 tasks.add(new CompletableFuture<>());
595 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
596 assertTrue(executor.runAll());
598 assertTrue(result.isDone());
599 assertSame(outcome, result.get());
601 // second task completes, others do not
602 tasks = new LinkedList<>();
604 tasks.add(new CompletableFuture<>());
605 tasks.add(CompletableFuture.completedFuture(outcome));
606 tasks.add(new CompletableFuture<>());
608 result = oper.anyOf(tasks);
609 assertTrue(executor.runAll());
611 assertTrue(result.isDone());
612 assertSame(outcome, result.get());
614 // third task completes, others do not
615 tasks = new LinkedList<>();
617 tasks.add(new CompletableFuture<>());
618 tasks.add(new CompletableFuture<>());
619 tasks.add(CompletableFuture.completedFuture(outcome));
621 result = oper.anyOf(tasks);
622 assertTrue(executor.runAll());
624 assertTrue(result.isDone());
625 assertSame(outcome, result.get());
629 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
632 @SuppressWarnings("unchecked")
633 public void testAnyOfEdge() throws Exception {
634 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
636 // zero items: check both using a list and using an array
637 assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf(tasks));
638 assertThatIllegalArgumentException().isThrownBy(() -> oper.anyOf());
640 // one item: : check both using a list and using an array
641 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
644 assertSame(future1, oper.anyOf(tasks));
645 assertSame(future1, oper.anyOf(future1));
649 * Tests both flavors of allOf(), because one invokes the other.
652 public void testAllOf() throws Exception {
653 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
655 final OperationOutcome outcome = params.makeOutcome();
657 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
658 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
659 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
665 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
667 assertTrue(executor.runAll());
668 assertFalse(result.isDone());
669 future1.complete(outcome);
671 // complete 3 before 2
672 assertTrue(executor.runAll());
673 assertFalse(result.isDone());
674 future3.complete(outcome);
676 assertTrue(executor.runAll());
677 assertFalse(result.isDone());
678 future2.complete(outcome);
680 // all of them are now done
681 assertTrue(executor.runAll());
682 assertTrue(result.isDone());
683 assertSame(outcome, result.get());
687 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
690 @SuppressWarnings("unchecked")
691 public void testAllOfEdge() throws Exception {
692 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
694 // zero items: check both using a list and using an array
695 assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf(tasks));
696 assertThatIllegalArgumentException().isThrownBy(() -> oper.allOf());
698 // one item: : check both using a list and using an array
699 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
702 assertSame(future1, oper.allOf(tasks));
703 assertSame(future1, oper.allOf(future1));
707 public void testCombineOutcomes() throws Exception {
709 verifyOutcomes(0, PolicyResult.SUCCESS);
710 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
712 // maximum is in different positions
713 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
714 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
715 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
718 final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
719 tasks.add(CompletableFuture.completedFuture(null));
720 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
722 assertTrue(executor.runAll());
723 assertTrue(result.isDone());
724 assertNull(result.get());
726 // one throws an exception during execution
727 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
730 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
731 tasks.add(CompletableFuture.failedFuture(except));
732 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
733 result = oper.allOf(tasks);
735 assertTrue(executor.runAll());
736 assertTrue(result.isCompletedExceptionally());
737 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
740 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
741 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
744 OperationOutcome expectedOutcome = null;
746 for (int count = 0; count < results.length; ++count) {
747 OperationOutcome outcome = params.makeOutcome();
748 outcome.setResult(results[count]);
749 tasks.add(CompletableFuture.completedFuture(outcome));
751 if (count == expected) {
752 expectedOutcome = outcome;
756 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
758 assertTrue(executor.runAll());
759 assertTrue(result.isDone());
760 assertSame(expectedOutcome, result.get());
763 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
764 final OperationOutcome taskOutcome) {
766 return outcome -> CompletableFuture.completedFuture(taskOutcome);
770 public void testDetmPriority() throws CoderException {
771 assertEquals(1, oper.detmPriority(null));
773 OperationOutcome outcome = params.makeOutcome();
775 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
776 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
777 PolicyResult.FAILURE_EXCEPTION, 6);
779 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
780 outcome.setResult(ent.getKey());
781 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
785 * Test null result. We can't actually set it to null, because the set() method
786 * won't allow it. Instead, we decode it from a structure.
788 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
789 assertEquals(1, oper.detmPriority(outcome));
793 * Tests doTask(Future) when the controller is not running.
796 public void testDoTaskFutureNotRunning() throws Exception {
797 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
799 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
800 controller.complete(params.makeOutcome());
802 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, params.makeOutcome(), taskFuture);
803 assertFalse(future.isDone());
804 assertTrue(executor.runAll());
806 // should not have run the task
807 assertFalse(future.isDone());
809 // should have canceled the task future
810 assertTrue(taskFuture.isCancelled());
814 * Tests doTask(Future) when the previous outcome was successful.
817 public void testDoTaskFutureSuccess() throws Exception {
818 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
819 final OperationOutcome taskOutcome = params.makeOutcome();
821 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
823 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, params.makeOutcome(), taskFuture);
825 taskFuture.complete(taskOutcome);
826 assertTrue(executor.runAll());
828 assertTrue(future.isDone());
829 assertSame(taskOutcome, future.get());
831 // controller should not be done yet
832 assertFalse(controller.isDone());
836 * Tests doTask(Future) when the previous outcome was failed.
839 public void testDoTaskFutureFailure() throws Exception {
840 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
841 final OperationOutcome failedOutcome = params.makeOutcome();
842 failedOutcome.setResult(PolicyResult.FAILURE);
844 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
846 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, failedOutcome, taskFuture);
847 assertFalse(future.isDone());
848 assertTrue(executor.runAll());
850 // should not have run the task
851 assertFalse(future.isDone());
853 // should have canceled the task future
854 assertTrue(taskFuture.isCancelled());
856 // controller SHOULD be done now
857 assertTrue(controller.isDone());
858 assertSame(failedOutcome, controller.get());
862 * Tests doTask(Future) when the previous outcome was failed, but not checking
866 public void testDoTaskFutureUncheckedFailure() throws Exception {
867 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
868 final OperationOutcome failedOutcome = params.makeOutcome();
869 failedOutcome.setResult(PolicyResult.FAILURE);
871 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
873 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, failedOutcome, taskFuture);
874 assertFalse(future.isDone());
877 OperationOutcome taskOutcome = params.makeOutcome();
878 taskFuture.complete(taskOutcome);
880 assertTrue(executor.runAll());
882 // should have run the task
883 assertTrue(future.isDone());
885 assertTrue(future.isDone());
886 assertSame(taskOutcome, future.get());
888 // controller should not be done yet
889 assertFalse(controller.isDone());
893 * Tests doTask(Function) when the controller is not running.
896 public void testDoTaskFunctionNotRunning() throws Exception {
897 AtomicBoolean invoked = new AtomicBoolean();
899 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
901 return CompletableFuture.completedFuture(params.makeOutcome());
904 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
905 controller.complete(params.makeOutcome());
907 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(params.makeOutcome());
908 assertFalse(future.isDone());
909 assertTrue(executor.runAll());
911 // should not have run the task
912 assertFalse(future.isDone());
914 // should not have even invoked the task
915 assertFalse(invoked.get());
919 * Tests doTask(Function) when the previous outcome was successful.
922 public void testDoTaskFunctionSuccess() throws Exception {
923 final OperationOutcome taskOutcome = params.makeOutcome();
925 final OperationOutcome failedOutcome = params.makeOutcome();
927 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
929 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
931 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
933 assertTrue(future.isDone());
934 assertSame(taskOutcome, future.get());
936 // controller should not be done yet
937 assertFalse(controller.isDone());
941 * Tests doTask(Function) when the previous outcome was failed.
944 public void testDoTaskFunctionFailure() throws Exception {
945 final OperationOutcome failedOutcome = params.makeOutcome();
946 failedOutcome.setResult(PolicyResult.FAILURE);
948 AtomicBoolean invoked = new AtomicBoolean();
950 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
952 return CompletableFuture.completedFuture(params.makeOutcome());
955 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
957 CompletableFuture<OperationOutcome> future = oper.doTask(controller, true, task).apply(failedOutcome);
958 assertFalse(future.isDone());
959 assertTrue(executor.runAll());
961 // should not have run the task
962 assertFalse(future.isDone());
964 // should not have even invoked the task
965 assertFalse(invoked.get());
967 // controller should have the failed task
968 assertTrue(controller.isDone());
969 assertSame(failedOutcome, controller.get());
973 * Tests doTask(Function) when the previous outcome was failed, but not checking
977 public void testDoTaskFunctionUncheckedFailure() throws Exception {
978 final OperationOutcome taskOutcome = params.makeOutcome();
980 final OperationOutcome failedOutcome = params.makeOutcome();
981 failedOutcome.setResult(PolicyResult.FAILURE);
983 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
985 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
987 CompletableFuture<OperationOutcome> future = oper.doTask(controller, false, task).apply(failedOutcome);
989 assertTrue(future.isDone());
990 assertSame(taskOutcome, future.get());
992 // controller should not be done yet
993 assertFalse(controller.isDone());
997 * Tests callbackStarted() when the pipeline has already been stopped.
1000 public void testCallbackStartedNotRunning() {
1001 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1004 * arrange to stop the controller when the start-callback is invoked, but capture
1007 params = params.toBuilder().startCallback(oper -> {
1009 future.get().cancel(false);
1012 // new params, thus need a new operation
1013 oper = new MyOper();
1015 future.set(oper.start());
1016 assertTrue(executor.runAll());
1018 // should have only run once
1019 assertEquals(1, numStart);
1023 * Tests callbackCompleted() when the pipeline has already been stopped.
1026 public void testCallbackCompletedNotRunning() {
1027 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1029 // arrange to stop the controller when the start-callback is invoked
1030 params = params.toBuilder().startCallback(oper -> {
1031 future.get().cancel(false);
1034 // new params, thus need a new operation
1035 oper = new MyOper();
1037 future.set(oper.start());
1038 assertTrue(executor.runAll());
1040 // should not have been set
1042 assertEquals(0, numEnd);
1046 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1047 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1049 OperationOutcome outcome;
1051 outcome = new OperationOutcome();
1052 oper.setOutcome(outcome, timex);
1053 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1054 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1056 outcome = new OperationOutcome();
1057 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1058 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1059 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1063 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1064 OperationOutcome outcome;
1066 outcome = new OperationOutcome();
1067 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1068 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1069 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1071 for (PolicyResult result : FAILURE_RESULTS) {
1072 outcome = new OperationOutcome();
1073 oper.setOutcome(outcome, result);
1074 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1075 assertEquals(result.toString(), result, outcome.getResult());
1080 public void testIsTimeout() {
1081 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1083 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1084 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1085 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1086 assertFalse(oper.isTimeout(new CompletionException(null)));
1087 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1089 assertTrue(oper.isTimeout(timex));
1090 assertTrue(oper.isTimeout(new CompletionException(timex)));
1094 public void testGetRetry() {
1095 assertEquals(0, oper.getRetry(null));
1096 assertEquals(10, oper.getRetry(10));
1100 public void testGetRetryWait() {
1101 // need an operator that doesn't override the retry time
1102 OperationPartial oper2 = new OperationPartial(params, operator) {};
1103 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1107 public void testGetTimeOutMs() {
1108 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1110 params = params.toBuilder().timeoutSec(null).build();
1112 // new params, thus need a new operation
1113 oper = new MyOper();
1115 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1118 private void starter(OperationOutcome oper) {
1120 tstart = oper.getStart();
1124 private void completer(OperationOutcome oper) {
1130 * Gets a function that does nothing.
1132 * @param <T> type of input parameter expected by the function
1133 * @return a function that does nothing
1135 private <T> Consumer<T> noop() {
1140 private OperationOutcome makeSuccess() {
1141 OperationOutcome outcome = params.makeOutcome();
1142 outcome.setResult(PolicyResult.SUCCESS);
1147 private OperationOutcome makeFailure() {
1148 OperationOutcome outcome = params.makeOutcome();
1149 outcome.setResult(PolicyResult.FAILURE);
1157 * @param testName test name
1158 * @param expectedCallbacks number of callbacks expected
1159 * @param expectedOperations number of operation invocations expected
1160 * @param expectedResult expected outcome
1162 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1163 PolicyResult expectedResult) {
1165 String expectedSubRequestId =
1166 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1168 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1174 * @param testName test name
1175 * @param expectedCallbacks number of callbacks expected
1176 * @param expectedOperations number of operation invocations expected
1177 * @param expectedResult expected outcome
1178 * @param expectedSubRequestId expected sub request ID
1179 * @param manipulator function to modify the future returned by
1180 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1181 * in the executor are run
1183 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1184 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1186 CompletableFuture<OperationOutcome> future = oper.start();
1188 manipulator.accept(future);
1190 assertTrue(testName, executor.runAll());
1192 assertEquals(testName, expectedCallbacks, numStart);
1193 assertEquals(testName, expectedCallbacks, numEnd);
1195 if (expectedCallbacks > 0) {
1196 assertNotNull(testName, opstart);
1197 assertNotNull(testName, opend);
1198 assertEquals(testName, expectedResult, opend.getResult());
1200 assertSame(testName, tstart, opstart.getStart());
1201 assertSame(testName, tstart, opend.getStart());
1204 assertTrue(future.isDone());
1205 assertSame(testName, opend, future.get());
1207 } catch (InterruptedException | ExecutionException e) {
1208 throw new IllegalStateException(e);
1211 if (expectedOperations > 0) {
1212 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1216 assertEquals(testName, expectedOperations, oper.getCount());
1219 private class MyOper extends OperationPartial {
1221 private int count = 0;
1224 private boolean genException;
1227 private int maxFailures = 0;
1230 private CompletableFuture<OperationOutcome> guard;
1234 super(OperationPartialTest.this.params, operator);
1238 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1241 throw new IllegalStateException(EXPECTED_EXCEPTION);
1244 operation.setSubRequestId(String.valueOf(attempt));
1246 if (count > maxFailures) {
1247 operation.setResult(PolicyResult.SUCCESS);
1249 operation.setResult(PolicyResult.FAILURE);
1256 protected CompletableFuture<OperationOutcome> startGuardAsync() {
1257 return (guard != null ? guard : super.startGuardAsync());
1261 protected long getRetryWaitMs() {
1263 * Sleep timers run in the background, but we want to control things via the
1264 * "executor", thus we avoid sleep timers altogether by simply returning 0.