2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.times;
33 import static org.mockito.Mockito.verify;
35 import java.time.Instant;
36 import java.util.Arrays;
37 import java.util.LinkedList;
38 import java.util.List;
40 import java.util.Map.Entry;
41 import java.util.Queue;
42 import java.util.TreeMap;
43 import java.util.UUID;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionException;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Executor;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicBoolean;
54 import java.util.concurrent.atomic.AtomicInteger;
55 import java.util.concurrent.atomic.AtomicReference;
56 import java.util.function.Consumer;
57 import java.util.function.Function;
58 import java.util.stream.Collectors;
61 import org.junit.Before;
62 import org.junit.Test;
63 import org.onap.policy.controlloop.ControlLoopOperation;
64 import org.onap.policy.controlloop.VirtualControlLoopEvent;
65 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
66 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
67 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
68 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
69 import org.onap.policy.controlloop.policy.PolicyResult;
71 public class OperatorPartialTest {
72 private static final int MAX_PARALLEL_REQUESTS = 10;
73 private static final String EXPECTED_EXCEPTION = "expected exception";
74 private static final String ACTOR = "my-actor";
75 private static final String OPERATOR = "my-operator";
76 private static final String TARGET = "my-target";
77 private static final int TIMEOUT = 1000;
78 private static final UUID REQ_ID = UUID.randomUUID();
80 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
81 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
83 private VirtualControlLoopEvent event;
84 private Map<String, Object> config;
85 private ControlLoopEventContext context;
86 private MyExec executor;
87 private ControlLoopOperationParams params;
94 private Instant tstart;
96 private OperationOutcome opstart;
97 private OperationOutcome opend;
100 * Initializes the fields, including {@link #oper}.
103 public void setUp() {
104 event = new VirtualControlLoopEvent();
105 event.setRequestId(REQ_ID);
107 config = new TreeMap<>();
108 context = new ControlLoopEventContext(event);
109 executor = new MyExec();
111 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
112 .executor(executor).actor(ACTOR).operation(OPERATOR).timeoutSec(TIMEOUT)
113 .startCallback(this::starter).targetEntity(TARGET).build();
116 oper.configure(new TreeMap<>());
126 public void testOperatorPartial_testGetActorName_testGetName() {
127 assertEquals(ACTOR, oper.getActorName());
128 assertEquals(OPERATOR, oper.getName());
129 assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
133 public void testGetBlockingExecutor() throws InterruptedException {
134 CountDownLatch latch = new CountDownLatch(1);
137 * Use an operator that doesn't override getBlockingExecutor().
139 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
140 oper2.getBlockingExecutor().execute(() -> latch.countDown());
142 assertTrue(latch.await(5, TimeUnit.SECONDS));
146 public void testDoConfigure() {
147 oper = spy(new MyOper());
149 oper.configure(config);
150 verify(oper).configure(config);
152 // repeat - SHOULD be run again
153 oper.configure(config);
154 verify(oper, times(2)).configure(config);
158 public void testDoStart() {
159 oper = spy(new MyOper());
161 oper.configure(config);
164 verify(oper).doStart();
166 // others should not have been invoked
167 verify(oper, never()).doStop();
168 verify(oper, never()).doShutdown();
172 public void testDoStop() {
173 oper = spy(new MyOper());
175 oper.configure(config);
179 verify(oper).doStop();
181 // should not have been re-invoked
182 verify(oper).doStart();
184 // others should not have been invoked
185 verify(oper, never()).doShutdown();
189 public void testDoShutdown() {
190 oper = spy(new MyOper());
192 oper.configure(config);
196 verify(oper).doShutdown();
198 // should not have been re-invoked
199 verify(oper).doStart();
201 // others should not have been invoked
202 verify(oper, never()).doStop();
206 public void testStartOperation() {
207 verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
211 * Tests startOperation() when the operator is not running.
214 public void testStartOperationNotRunning() {
215 // use a new operator, one that hasn't been started yet
217 oper.configure(new TreeMap<>());
219 assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
223 * Tests startOperation() when the operation has a preprocessor.
226 public void testStartOperationWithPreprocessor() {
227 AtomicInteger count = new AtomicInteger();
229 CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
230 count.incrementAndGet();
231 return makeSuccess();
234 oper.setPreProcessor(preproc);
236 verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
238 assertEquals(1, count.get());
242 * Tests startOperation() with multiple running requests.
245 public void testStartOperationMultiple() {
246 for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
247 oper.startOperation(params);
250 assertTrue(executor.runAll());
252 assertNotNull(opstart);
253 assertNotNull(opend);
254 assertEquals(PolicyResult.SUCCESS, opend.getResult());
256 assertEquals(MAX_PARALLEL_REQUESTS, numStart);
257 assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
258 assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
262 * Tests startPreprocessor() when the preprocessor returns a failure.
265 public void testStartPreprocessorFailure() {
266 oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
268 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
272 * Tests startPreprocessor() when the preprocessor throws an exception.
275 public void testStartPreprocessorException() {
276 // arrange for the preprocessor to throw an exception
277 oper.setPreProcessor(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
279 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
283 * Tests startPreprocessor() when the pipeline is not running.
286 public void testStartPreprocessorNotRunning() {
287 // arrange for the preprocessor to return success, which will be ignored
288 oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
290 oper.startOperation(params).cancel(false);
291 assertTrue(executor.runAll());
296 assertEquals(0, numStart);
297 assertEquals(0, oper.getCount());
298 assertEquals(0, numEnd);
302 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
305 public void testStartPreprocessorBuilderException() {
306 oper = new MyOper() {
308 protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
309 throw new IllegalStateException(EXPECTED_EXCEPTION);
313 oper.configure(new TreeMap<>());
316 assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
318 // should be nothing in the queue
319 assertEquals(0, executor.getQueueLength());
323 public void testStartPreprocessorAsync() {
324 assertNull(oper.startPreprocessorAsync(params));
328 public void testStartOperationAsync() {
329 oper.startOperation(params);
330 assertTrue(executor.runAll());
332 assertEquals(1, oper.getCount());
336 public void testIsSuccess() {
337 OperationOutcome outcome = new OperationOutcome();
339 outcome.setResult(PolicyResult.SUCCESS);
340 assertTrue(oper.isSuccess(outcome));
342 for (PolicyResult failure : FAILURE_RESULTS) {
343 outcome.setResult(failure);
344 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
349 public void testIsActorFailed() {
350 assertFalse(oper.isActorFailed(null));
352 OperationOutcome outcome = params.makeOutcome();
355 outcome.setResult(PolicyResult.SUCCESS);
356 assertFalse(oper.isActorFailed(outcome));
358 outcome.setResult(PolicyResult.FAILURE_RETRIES);
359 assertFalse(oper.isActorFailed(outcome));
362 outcome.setResult(PolicyResult.FAILURE);
365 outcome.setActor(TARGET);
366 assertFalse(oper.isActorFailed(outcome));
367 outcome.setActor(null);
368 assertFalse(oper.isActorFailed(outcome));
369 outcome.setActor(ACTOR);
371 // incorrect operation
372 outcome.setOperation(TARGET);
373 assertFalse(oper.isActorFailed(outcome));
374 outcome.setOperation(null);
375 assertFalse(oper.isActorFailed(outcome));
376 outcome.setOperation(OPERATOR);
379 assertTrue(oper.isActorFailed(outcome));
383 public void testDoOperation() {
385 * Use an operator that doesn't override doOperation().
387 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {
389 protected Executor getBlockingExecutor() {
394 oper2.configure(new TreeMap<>());
397 oper2.startOperation(params);
398 assertTrue(executor.runAll());
400 assertNotNull(opend);
401 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
405 public void testTimeout() throws Exception {
407 // use a real executor
408 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
410 // trigger timeout very quickly
411 oper = new MyOper() {
413 protected long getTimeOutMillis(Integer timeoutSec) {
418 protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params,
419 int attempt, OperationOutcome outcome) {
421 OperationOutcome outcome2 = params.makeOutcome();
422 outcome2.setResult(PolicyResult.SUCCESS);
425 * Create an incomplete future that will timeout after the operation's
426 * timeout. If it fires before the other timer, then it will return a
429 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
430 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
431 params.getExecutor());
437 oper.configure(new TreeMap<>());
440 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.startOperation(params).get().getResult());
444 * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
445 * operation once the preprocessor completes.
448 public void testTimeoutInPreprocessor() throws Exception {
450 // use a real executor
451 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
453 // trigger timeout very quickly
454 oper = new MyOper() {
456 protected long getTimeOutMillis(Integer timeoutSec) {
461 protected Executor getBlockingExecutor() {
463 Thread thread = new Thread(command);
469 protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
471 OperationOutcome outcome = makeSuccess();
474 * Create an incomplete future that will timeout after the operation's
475 * timeout. If it fires before the other timer, then it will return a
478 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
479 future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
480 params.getExecutor());
486 oper.configure(new TreeMap<>());
489 OperationOutcome result = oper.startOperation(params).get();
490 assertEquals(PolicyResult.SUCCESS, result.getResult());
492 assertNotNull(opstart);
493 assertNotNull(opend);
494 assertEquals(PolicyResult.SUCCESS, opend.getResult());
496 assertEquals(1, numStart);
497 assertEquals(1, oper.getCount());
498 assertEquals(1, numEnd);
502 * Tests retry functions, when the count is set to zero and retries are exhausted.
505 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
506 params = params.toBuilder().retry(0).build();
507 oper.setMaxFailures(10);
509 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
513 * Tests retry functions, when the count is null and retries are exhausted.
516 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
517 params = params.toBuilder().retry(null).build();
518 oper.setMaxFailures(10);
520 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
524 * Tests retry functions, when retries are exhausted.
527 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
528 final int maxRetries = 3;
529 params = params.toBuilder().retry(maxRetries).build();
530 oper.setMaxFailures(10);
532 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
533 PolicyResult.FAILURE_RETRIES);
537 * Tests retry functions, when a success follows some retries.
540 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
541 params = params.toBuilder().retry(10).build();
543 final int maxFailures = 3;
544 oper.setMaxFailures(maxFailures);
546 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
547 PolicyResult.SUCCESS);
551 * Tests retry functions, when the outcome is {@code null}.
554 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
556 // arrange to return null from doOperation()
557 oper = new MyOper() {
559 protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
560 OperationOutcome operation) {
563 super.doOperation(params, attempt, operation);
568 oper.configure(new TreeMap<>());
571 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
575 public void testIsSameOperation() {
576 assertFalse(oper.isSameOperation(null));
578 OperationOutcome outcome = params.makeOutcome();
580 // wrong actor - should be false
581 outcome.setActor(null);
582 assertFalse(oper.isSameOperation(outcome));
583 outcome.setActor(TARGET);
584 assertFalse(oper.isSameOperation(outcome));
585 outcome.setActor(ACTOR);
587 // wrong operation - should be null
588 outcome.setOperation(null);
589 assertFalse(oper.isSameOperation(outcome));
590 outcome.setOperation(TARGET);
591 assertFalse(oper.isSameOperation(outcome));
592 outcome.setOperation(OPERATOR);
594 assertTrue(oper.isSameOperation(outcome));
598 * Tests handleFailure() when the outcome is a success.
601 public void testHandlePreprocessorFailureTrue() {
602 oper.setPreProcessor(CompletableFuture.completedFuture(makeSuccess()));
603 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
607 * Tests handleFailure() when the outcome is <i>not</i> a success.
610 public void testHandlePreprocessorFailureFalse() throws Exception {
611 oper.setPreProcessor(CompletableFuture.completedFuture(makeFailure()));
612 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
616 * Tests handleFailure() when the outcome is {@code null}.
619 public void testHandlePreprocessorFailureNull() throws Exception {
620 // arrange to return null from the preprocessor
621 oper.setPreProcessor(CompletableFuture.completedFuture(null));
623 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
627 public void testFromException() {
628 // arrange to generate an exception when operation runs
629 oper.setGenException(true);
631 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
635 * Tests fromException() when there is no exception.
638 public void testFromExceptionNoExcept() {
639 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
643 * Tests both flavors of anyOf(), because one invokes the other.
646 public void testAnyOf() throws Exception {
647 // first task completes, others do not
648 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
650 final OperationOutcome outcome = params.makeOutcome();
652 tasks.add(CompletableFuture.completedFuture(outcome));
653 tasks.add(new CompletableFuture<>());
654 tasks.add(new CompletableFuture<>());
656 CompletableFuture<OperationOutcome> result = oper.anyOf(params, tasks);
657 assertTrue(executor.runAll());
659 assertTrue(result.isDone());
660 assertSame(outcome, result.get());
662 // second task completes, others do not
663 tasks = new LinkedList<>();
665 tasks.add(new CompletableFuture<>());
666 tasks.add(CompletableFuture.completedFuture(outcome));
667 tasks.add(new CompletableFuture<>());
669 result = oper.anyOf(params, tasks);
670 assertTrue(executor.runAll());
672 assertTrue(result.isDone());
673 assertSame(outcome, result.get());
675 // third task completes, others do not
676 tasks = new LinkedList<>();
678 tasks.add(new CompletableFuture<>());
679 tasks.add(new CompletableFuture<>());
680 tasks.add(CompletableFuture.completedFuture(outcome));
682 result = oper.anyOf(params, tasks);
683 assertTrue(executor.runAll());
685 assertTrue(result.isDone());
686 assertSame(outcome, result.get());
690 * Tests both flavors of allOf(), because one invokes the other.
693 public void testAllOf() throws Exception {
694 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
696 final OperationOutcome outcome = params.makeOutcome();
698 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
699 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
700 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
706 CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
708 assertTrue(executor.runAll());
709 assertFalse(result.isDone());
710 future1.complete(outcome);
712 // complete 3 before 2
713 assertTrue(executor.runAll());
714 assertFalse(result.isDone());
715 future3.complete(outcome);
717 assertTrue(executor.runAll());
718 assertFalse(result.isDone());
719 future2.complete(outcome);
721 // all of them are now done
722 assertTrue(executor.runAll());
723 assertTrue(result.isDone());
724 assertSame(outcome, result.get());
728 public void testCombineOutcomes() throws Exception {
730 verifyOutcomes(0, PolicyResult.SUCCESS);
731 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
733 // maximum is in different positions
734 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
735 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
736 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
739 final List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
740 tasks.add(CompletableFuture.completedFuture(null));
741 CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
743 assertTrue(executor.runAll());
744 assertTrue(result.isDone());
745 assertNull(result.get());
747 // one throws an exception during execution
748 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
751 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
752 tasks.add(CompletableFuture.failedFuture(except));
753 tasks.add(CompletableFuture.completedFuture(params.makeOutcome()));
754 result = oper.allOf(params, tasks);
756 assertTrue(executor.runAll());
757 assertTrue(result.isCompletedExceptionally());
758 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
761 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
762 List<CompletableFuture<OperationOutcome>> tasks = new LinkedList<>();
765 OperationOutcome expectedOutcome = null;
767 for (int count = 0; count < results.length; ++count) {
768 OperationOutcome outcome = params.makeOutcome();
769 outcome.setResult(results[count]);
770 tasks.add(CompletableFuture.completedFuture(outcome));
772 if (count == expected) {
773 expectedOutcome = outcome;
777 CompletableFuture<OperationOutcome> result = oper.allOf(params, tasks);
779 assertTrue(executor.runAll());
780 assertTrue(result.isDone());
781 assertSame(expectedOutcome, result.get());
784 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> makeTask(
785 final OperationOutcome taskOutcome) {
787 return outcome -> CompletableFuture.completedFuture(taskOutcome);
791 public void testDetmPriority() {
792 assertEquals(1, oper.detmPriority(null));
794 OperationOutcome outcome = params.makeOutcome();
796 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
797 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
798 PolicyResult.FAILURE_EXCEPTION, 6);
800 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
801 outcome.setResult(ent.getKey());
802 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
807 * Tests doTask(Future) when the controller is not running.
810 public void testDoTaskFutureNotRunning() throws Exception {
811 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
813 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
814 controller.complete(params.makeOutcome());
816 CompletableFuture<OperationOutcome> future =
817 oper.doTask(params, controller, false, params.makeOutcome(), taskFuture);
818 assertFalse(future.isDone());
819 assertTrue(executor.runAll());
821 // should not have run the task
822 assertFalse(future.isDone());
824 // should have canceled the task future
825 assertTrue(taskFuture.isCancelled());
829 * Tests doTask(Future) when the previous outcome was successful.
832 public void testDoTaskFutureSuccess() throws Exception {
833 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
834 final OperationOutcome taskOutcome = params.makeOutcome();
836 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
838 CompletableFuture<OperationOutcome> future =
839 oper.doTask(params, controller, true, params.makeOutcome(), taskFuture);
841 taskFuture.complete(taskOutcome);
842 assertTrue(executor.runAll());
844 assertTrue(future.isDone());
845 assertSame(taskOutcome, future.get());
847 // controller should not be done yet
848 assertFalse(controller.isDone());
852 * Tests doTask(Future) when the previous outcome was failed.
855 public void testDoTaskFutureFailure() throws Exception {
856 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
857 final OperationOutcome failedOutcome = params.makeOutcome();
858 failedOutcome.setResult(PolicyResult.FAILURE);
860 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
862 CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, failedOutcome, taskFuture);
863 assertFalse(future.isDone());
864 assertTrue(executor.runAll());
866 // should not have run the task
867 assertFalse(future.isDone());
869 // should have canceled the task future
870 assertTrue(taskFuture.isCancelled());
872 // controller SHOULD be done now
873 assertTrue(controller.isDone());
874 assertSame(failedOutcome, controller.get());
878 * Tests doTask(Future) when the previous outcome was failed, but not checking
882 public void testDoTaskFutureUncheckedFailure() throws Exception {
883 CompletableFuture<OperationOutcome> taskFuture = new CompletableFuture<>();
884 final OperationOutcome failedOutcome = params.makeOutcome();
885 failedOutcome.setResult(PolicyResult.FAILURE);
887 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
889 CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, failedOutcome, taskFuture);
890 assertFalse(future.isDone());
893 OperationOutcome taskOutcome = params.makeOutcome();
894 taskFuture.complete(taskOutcome);
896 assertTrue(executor.runAll());
898 // should have run the task
899 assertTrue(future.isDone());
901 assertTrue(future.isDone());
902 assertSame(taskOutcome, future.get());
904 // controller should not be done yet
905 assertFalse(controller.isDone());
909 * Tests doTask(Function) when the controller is not running.
912 public void testDoTaskFunctionNotRunning() throws Exception {
913 AtomicBoolean invoked = new AtomicBoolean();
915 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
917 return CompletableFuture.completedFuture(params.makeOutcome());
920 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
921 controller.complete(params.makeOutcome());
923 CompletableFuture<OperationOutcome> future =
924 oper.doTask(params, controller, false, task).apply(params.makeOutcome());
925 assertFalse(future.isDone());
926 assertTrue(executor.runAll());
928 // should not have run the task
929 assertFalse(future.isDone());
931 // should not have even invoked the task
932 assertFalse(invoked.get());
936 * Tests doTask(Function) when the previous outcome was successful.
939 public void testDoTaskFunctionSuccess() throws Exception {
940 final OperationOutcome taskOutcome = params.makeOutcome();
942 final OperationOutcome failedOutcome = params.makeOutcome();
944 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
946 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
948 CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
950 assertTrue(future.isDone());
951 assertSame(taskOutcome, future.get());
953 // controller should not be done yet
954 assertFalse(controller.isDone());
958 * Tests doTask(Function) when the previous outcome was failed.
961 public void testDoTaskFunctionFailure() throws Exception {
962 final OperationOutcome failedOutcome = params.makeOutcome();
963 failedOutcome.setResult(PolicyResult.FAILURE);
965 AtomicBoolean invoked = new AtomicBoolean();
967 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = outcome -> {
969 return CompletableFuture.completedFuture(params.makeOutcome());
972 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
974 CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, true, task).apply(failedOutcome);
975 assertFalse(future.isDone());
976 assertTrue(executor.runAll());
978 // should not have run the task
979 assertFalse(future.isDone());
981 // should not have even invoked the task
982 assertFalse(invoked.get());
984 // controller should have the failed task
985 assertTrue(controller.isDone());
986 assertSame(failedOutcome, controller.get());
990 * Tests doTask(Function) when the previous outcome was failed, but not checking
994 public void testDoTaskFunctionUncheckedFailure() throws Exception {
995 final OperationOutcome taskOutcome = params.makeOutcome();
997 final OperationOutcome failedOutcome = params.makeOutcome();
998 failedOutcome.setResult(PolicyResult.FAILURE);
1000 Function<OperationOutcome, CompletableFuture<OperationOutcome>> task = makeTask(taskOutcome);
1002 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
1004 CompletableFuture<OperationOutcome> future = oper.doTask(params, controller, false, task).apply(failedOutcome);
1006 assertTrue(future.isDone());
1007 assertSame(taskOutcome, future.get());
1009 // controller should not be done yet
1010 assertFalse(controller.isDone());
1014 * Tests callbackStarted() when the pipeline has already been stopped.
1017 public void testCallbackStartedNotRunning() {
1018 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1021 * arrange to stop the controller when the start-callback is invoked, but capture
1024 params = params.toBuilder().startCallback(oper -> {
1026 future.get().cancel(false);
1029 future.set(oper.startOperation(params));
1030 assertTrue(executor.runAll());
1032 // should have only run once
1033 assertEquals(1, numStart);
1037 * Tests callbackCompleted() when the pipeline has already been stopped.
1040 public void testCallbackCompletedNotRunning() {
1041 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1043 // arrange to stop the controller when the start-callback is invoked
1044 params = params.toBuilder().startCallback(oper -> {
1045 future.get().cancel(false);
1048 future.set(oper.startOperation(params));
1049 assertTrue(executor.runAll());
1051 // should not have been set
1053 assertEquals(0, numEnd);
1057 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1058 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1060 OperationOutcome outcome;
1062 outcome = new OperationOutcome();
1063 oper.setOutcome(params, outcome, timex);
1064 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1065 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1067 outcome = new OperationOutcome();
1068 oper.setOutcome(params, outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1069 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1070 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1074 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1075 OperationOutcome outcome;
1077 outcome = new OperationOutcome();
1078 oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
1079 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1080 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1082 for (PolicyResult result : FAILURE_RESULTS) {
1083 outcome = new OperationOutcome();
1084 oper.setOutcome(params, outcome, result);
1085 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1086 assertEquals(result.toString(), result, outcome.getResult());
1091 public void testIsTimeout() {
1092 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1094 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1095 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1096 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1097 assertFalse(oper.isTimeout(new CompletionException(null)));
1098 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1100 assertTrue(oper.isTimeout(timex));
1101 assertTrue(oper.isTimeout(new CompletionException(timex)));
1105 public void testGetTimeOutMillis() {
1106 assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(params.getTimeoutSec()));
1108 params = params.toBuilder().timeoutSec(null).build();
1109 assertEquals(0, oper.getTimeOutMillis(params.getTimeoutSec()));
1112 private void starter(OperationOutcome oper) {
1114 tstart = oper.getStart();
1118 private void completer(OperationOutcome oper) {
1124 * Gets a function that does nothing.
1126 * @param <T> type of input parameter expected by the function
1127 * @return a function that does nothing
1129 private <T> Consumer<T> noop() {
1134 private OperationOutcome makeSuccess() {
1135 OperationOutcome outcome = params.makeOutcome();
1136 outcome.setResult(PolicyResult.SUCCESS);
1141 private OperationOutcome makeFailure() {
1142 OperationOutcome outcome = params.makeOutcome();
1143 outcome.setResult(PolicyResult.FAILURE);
1151 * @param testName test name
1152 * @param expectedCallbacks number of callbacks expected
1153 * @param expectedOperations number of operation invocations expected
1154 * @param expectedResult expected outcome
1156 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1157 PolicyResult expectedResult) {
1159 String expectedSubRequestId =
1160 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1162 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1168 * @param testName test name
1169 * @param expectedCallbacks number of callbacks expected
1170 * @param expectedOperations number of operation invocations expected
1171 * @param expectedResult expected outcome
1172 * @param expectedSubRequestId expected sub request ID
1173 * @param manipulator function to modify the future returned by
1174 * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
1175 * the tasks in the executor are run
1177 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1178 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1180 CompletableFuture<OperationOutcome> future = oper.startOperation(params);
1182 manipulator.accept(future);
1184 assertTrue(testName, executor.runAll());
1186 assertEquals(testName, expectedCallbacks, numStart);
1187 assertEquals(testName, expectedCallbacks, numEnd);
1189 if (expectedCallbacks > 0) {
1190 assertNotNull(testName, opstart);
1191 assertNotNull(testName, opend);
1192 assertEquals(testName, expectedResult, opend.getResult());
1194 assertSame(testName, tstart, opstart.getStart());
1195 assertSame(testName, tstart, opend.getStart());
1198 assertTrue(future.isDone());
1199 assertSame(testName, opend, future.get());
1201 } catch (InterruptedException | ExecutionException e) {
1202 throw new IllegalStateException(e);
1205 if (expectedOperations > 0) {
1206 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1210 assertEquals(testName, expectedOperations, oper.getCount());
1213 private class MyOper extends OperatorPartial {
1215 private int count = 0;
1218 private boolean genException;
1221 private int maxFailures = 0;
1224 private CompletableFuture<OperationOutcome> preProcessor;
1227 super(ACTOR, OPERATOR);
1231 protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt,
1232 OperationOutcome operation) {
1235 throw new IllegalStateException(EXPECTED_EXCEPTION);
1238 operation.setSubRequestId(String.valueOf(attempt));
1240 if (count > maxFailures) {
1241 operation.setResult(PolicyResult.SUCCESS);
1243 operation.setResult(PolicyResult.FAILURE);
1250 protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
1251 return (preProcessor != null ? preProcessor : super.startPreprocessorAsync(params));
1255 protected Executor getBlockingExecutor() {
1261 * Executor that will run tasks until the queue is empty or a maximum number of tasks
1262 * have been executed.
1264 private static class MyExec implements Executor {
1265 private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
1267 private Queue<Runnable> commands = new LinkedList<>();
1273 public int getQueueLength() {
1274 return commands.size();
1278 public void execute(Runnable command) {
1279 commands.add(command);
1282 public boolean runAll() {
1283 for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
1284 commands.remove().run();
1287 return commands.isEmpty();