2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotNull;
27 import static org.junit.Assert.assertNull;
28 import static org.junit.Assert.assertSame;
29 import static org.junit.Assert.assertTrue;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
34 import java.time.Instant;
35 import java.util.Arrays;
36 import java.util.LinkedList;
37 import java.util.List;
39 import java.util.Queue;
40 import java.util.TreeMap;
41 import java.util.UUID;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.CompletionException;
44 import java.util.concurrent.ExecutionException;
45 import java.util.concurrent.Executor;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Function;
54 import java.util.stream.Collectors;
57 import org.junit.Before;
58 import org.junit.Test;
59 import org.onap.policy.controlloop.ControlLoopOperation;
60 import org.onap.policy.controlloop.VirtualControlLoopEvent;
61 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
62 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
63 import org.onap.policy.controlloop.policy.Policy;
64 import org.onap.policy.controlloop.policy.PolicyResult;
66 public class OperatorPartialTest {
67 private static final int MAX_PARALLEL_REQUESTS = 10;
68 private static final String EXPECTED_EXCEPTION = "expected exception";
69 private static final String ACTOR = "my-actor";
70 private static final String OPERATOR = "my-operator";
71 private static final String TARGET = "my-target";
72 private static final int TIMEOUT = 1000;
73 private static final UUID REQ_ID = UUID.randomUUID();
75 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
76 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
78 private static final List<String> FAILURE_STRINGS =
79 FAILURE_RESULTS.stream().map(Object::toString).collect(Collectors.toList());
81 private VirtualControlLoopEvent event;
82 private Map<String, Object> config;
83 private ControlLoopEventContext context;
84 private MyExec executor;
85 private Policy policy;
86 private ControlLoopOperationParams params;
93 private Instant tstart;
95 private ControlLoopOperation opstart;
96 private ControlLoopOperation opend;
99 * Initializes the fields, including {@link #oper}.
102 public void setUp() {
103 event = new VirtualControlLoopEvent();
104 event.setRequestId(REQ_ID);
106 config = new TreeMap<>();
107 context = new ControlLoopEventContext(event);
108 executor = new MyExec();
110 policy = new Policy();
111 policy.setActor(ACTOR);
112 policy.setRecipe(OPERATOR);
113 policy.setTimeout(TIMEOUT);
115 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
116 .executor(executor).policy(policy).startCallback(this::starter).target(TARGET).build();
119 oper.configure(new TreeMap<>());
129 public void testOperatorPartial_testGetActorName_testGetName() {
130 assertEquals(ACTOR, oper.getActorName());
131 assertEquals(OPERATOR, oper.getName());
132 assertEquals(ACTOR + "." + OPERATOR, oper.getFullName());
136 public void testDoStart() {
137 oper = spy(new MyOper());
139 oper.configure(config);
142 verify(oper).doStart();
144 // others should not have been invoked
145 verify(oper, never()).doStop();
146 verify(oper, never()).doShutdown();
150 public void testDoStop() {
151 oper = spy(new MyOper());
153 oper.configure(config);
157 verify(oper).doStop();
159 // should not have been re-invoked
160 verify(oper).doStart();
162 // others should not have been invoked
163 verify(oper, never()).doShutdown();
167 public void testDoShutdown() {
168 oper = spy(new MyOper());
170 oper.configure(config);
174 verify(oper).doShutdown();
176 // should not have been re-invoked
177 verify(oper).doStart();
179 // others should not have been invoked
180 verify(oper, never()).doStop();
184 public void testStartOperation_testVerifyRunning() {
185 verifyRun("testStartOperation", 1, 1, PolicyResult.SUCCESS);
189 * Tests startOperation() when the operator is not running.
192 public void testStartOperationNotRunning() {
193 // use a new operator, one that hasn't been started yet
195 oper.configure(new TreeMap<>());
197 assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
201 * Tests startOperation() when the operation has a preprocessor.
204 public void testStartOperationWithPreprocessor_testStartPreprocessor() {
205 AtomicInteger count = new AtomicInteger();
208 Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preproc =
209 oper -> CompletableFuture.supplyAsync(() -> {
210 count.incrementAndGet();
211 oper.setOutcome(PolicyResult.SUCCESS.toString());
216 oper.setPreProcessor(preproc);
218 verifyRun("testStartOperationWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
220 assertEquals(1, count.get());
224 * Tests startOperation() with multiple running requests.
227 public void testStartOperationMultiple() {
228 for (int count = 0; count < MAX_PARALLEL_REQUESTS; ++count) {
229 oper.startOperation(params);
232 assertTrue(executor.runAll());
234 assertNotNull(opstart);
235 assertNotNull(opend);
236 assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
238 assertEquals(MAX_PARALLEL_REQUESTS, numStart);
239 assertEquals(MAX_PARALLEL_REQUESTS, oper.getCount());
240 assertEquals(MAX_PARALLEL_REQUESTS, numEnd);
244 * Tests startPreprocessor() when the preprocessor returns a failure.
247 public void testStartPreprocessorFailure() {
248 // arrange for the preprocessor to return a failure
249 oper.setPreProcessor(oper -> {
250 oper.setOutcome(PolicyResult.FAILURE_GUARD.toString());
251 return CompletableFuture.completedFuture(oper);
254 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
258 * Tests startPreprocessor() when the preprocessor throws an exception.
261 public void testStartPreprocessorException() {
262 // arrange for the preprocessor to throw an exception
263 oper.setPreProcessor(oper -> {
264 throw new IllegalStateException(EXPECTED_EXCEPTION);
267 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
271 * Tests startPreprocessor() when the pipeline is not running.
274 public void testStartPreprocessorNotRunning() {
275 // arrange for the preprocessor to return success, which will be ignored
276 oper.setPreProcessor(oper -> {
277 oper.setOutcome(PolicyResult.SUCCESS.toString());
278 return CompletableFuture.completedFuture(oper);
281 oper.startOperation(params).cancel(false);
282 assertTrue(executor.runAll());
287 assertEquals(0, numStart);
288 assertEquals(0, oper.getCount());
289 assertEquals(0, numEnd);
293 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
296 public void testStartPreprocessorBuilderException() {
297 oper = new MyOper() {
299 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
300 ControlLoopOperationParams params) {
301 throw new IllegalStateException(EXPECTED_EXCEPTION);
305 oper.configure(new TreeMap<>());
308 assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
310 // should be nothing in the queue
311 assertEquals(0, executor.getQueueLength());
315 public void testDoPreprocessorAsFuture() {
316 assertNull(oper.doPreprocessorAsFuture(params));
320 public void testStartOperationOnly_testDoOperationAsFuture() {
321 oper.startOperation(params);
322 assertTrue(executor.runAll());
324 assertEquals(1, oper.getCount());
328 * Tests startOperationOnce() when
329 * {@link OperatorPartial#doOperationAsFuture(ControlLoopOperationParams)} throws an
333 public void testStartOperationOnceBuilderException() {
334 oper = new MyOper() {
336 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
337 ControlLoopOperationParams params, int attempt) {
338 throw new IllegalStateException(EXPECTED_EXCEPTION);
342 oper.configure(new TreeMap<>());
345 assertThatIllegalStateException().isThrownBy(() -> oper.startOperation(params));
347 // should be nothing in the queue
348 assertEquals(0, executor.getQueueLength());
352 public void testIsSuccess() {
353 ControlLoopOperation outcome = new ControlLoopOperation();
355 outcome.setOutcome(PolicyResult.SUCCESS.toString());
356 assertTrue(oper.isSuccess(outcome));
358 for (String failure : FAILURE_STRINGS) {
359 outcome.setOutcome(failure);
360 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
365 public void testIsActorFailed() {
366 assertFalse(oper.isActorFailed(null));
368 ControlLoopOperation outcome = params.makeOutcome();
371 outcome.setOutcome(PolicyResult.SUCCESS.toString());
372 assertFalse(oper.isActorFailed(outcome));
374 outcome.setOutcome(PolicyResult.FAILURE_RETRIES.toString());
375 assertFalse(oper.isActorFailed(outcome));
378 outcome.setOutcome(PolicyResult.FAILURE.toString());
381 outcome.setActor(TARGET);
382 assertFalse(oper.isActorFailed(outcome));
383 outcome.setActor(null);
384 assertFalse(oper.isActorFailed(outcome));
385 outcome.setActor(ACTOR);
387 // incorrect operation
388 outcome.setOperation(TARGET);
389 assertFalse(oper.isActorFailed(outcome));
390 outcome.setOperation(null);
391 assertFalse(oper.isActorFailed(outcome));
392 outcome.setOperation(OPERATOR);
395 assertTrue(oper.isActorFailed(outcome));
399 public void testDoOperation() {
401 * Use an operator that doesn't override doOperation().
403 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATOR) {};
405 oper2.configure(new TreeMap<>());
408 oper2.startOperation(params);
409 assertTrue(executor.runAll());
411 assertNotNull(opend);
412 assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), opend.getOutcome());
416 public void testTimeout() throws Exception {
418 // use a real executor
419 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
421 // trigger timeout very quickly
422 oper = new MyOper() {
424 protected long getTimeOutMillis(Policy policy) {
429 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doOperationAsFuture(
430 ControlLoopOperationParams params, int attempt) {
433 ControlLoopOperation outcome2 = params.makeOutcome();
434 outcome2.setOutcome(PolicyResult.SUCCESS.toString());
437 * Create an incomplete future that will timeout after the operation's
438 * timeout. If it fires before the other timer, then it will return a
441 CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
442 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
443 params.getExecutor());
450 oper.configure(new TreeMap<>());
453 assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), oper.startOperation(params).get().getOutcome());
457 * Verifies that the timer doesn't encompass the preprocessor and doesn't stop the
458 * operation once the preprocessor completes.
461 public void testTimeoutInPreprocessor() throws Exception {
463 // use a real executor
464 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
466 // trigger timeout very quickly
467 oper = new MyOper() {
469 protected long getTimeOutMillis(Policy policy) {
474 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
475 ControlLoopOperationParams params) {
478 outcome.setOutcome(PolicyResult.SUCCESS.toString());
481 * Create an incomplete future that will timeout after the operation's
482 * timeout. If it fires before the other timer, then it will return a
485 CompletableFuture<ControlLoopOperation> future = new CompletableFuture<>();
486 future = future.orTimeout(200, TimeUnit.MILLISECONDS).handleAsync((unused1, unused2) -> outcome,
487 params.getExecutor());
494 oper.configure(new TreeMap<>());
497 ControlLoopOperation result = oper.startOperation(params).get();
498 assertEquals(PolicyResult.SUCCESS.toString(), result.getOutcome());
500 assertNotNull(opstart);
501 assertNotNull(opend);
502 assertEquals(PolicyResult.SUCCESS.toString(), opend.getOutcome());
504 assertEquals(1, numStart);
505 assertEquals(1, oper.getCount());
506 assertEquals(1, numEnd);
510 * Tests retry functions, when the count is set to zero and retries are exhausted.
513 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries() {
515 oper.setMaxFailures(10);
517 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
521 * Tests retry functions, when the count is null and retries are exhausted.
524 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
525 policy.setRetry(null);
526 oper.setMaxFailures(10);
528 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
532 * Tests retry functions, when retries are exhausted.
535 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
536 final int maxRetries = 3;
537 policy.setRetry(maxRetries);
538 oper.setMaxFailures(10);
540 verifyRun("testVerifyRunningWhenNot", maxRetries + 1, maxRetries + 1, PolicyResult.FAILURE_RETRIES);
544 * Tests retry functions, when a success follows some retries.
547 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
550 final int maxFailures = 3;
551 oper.setMaxFailures(maxFailures);
553 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
554 PolicyResult.SUCCESS);
558 * Tests retry functions, when the outcome is {@code null}.
561 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
563 // arrange to return null from doOperation()
564 oper = new MyOper() {
566 protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
567 ControlLoopOperation operation) {
570 super.doOperation(params, attempt, operation);
575 oper.configure(new TreeMap<>());
578 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
582 public void testGetActorOutcome() {
583 assertNull(oper.getActorOutcome(null));
585 ControlLoopOperation outcome = params.makeOutcome();
586 outcome.setOutcome(TARGET);
588 // wrong actor - should be null
589 outcome.setActor(null);
590 assertNull(oper.getActorOutcome(outcome));
591 outcome.setActor(TARGET);
592 assertNull(oper.getActorOutcome(outcome));
593 outcome.setActor(ACTOR);
595 // wrong operation - should be null
596 outcome.setOperation(null);
597 assertNull(oper.getActorOutcome(outcome));
598 outcome.setOperation(TARGET);
599 assertNull(oper.getActorOutcome(outcome));
600 outcome.setOperation(OPERATOR);
602 assertEquals(TARGET, oper.getActorOutcome(outcome));
606 public void testOnSuccess() throws Exception {
607 AtomicInteger count = new AtomicInteger();
609 final Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> nextStep = oper -> {
610 count.incrementAndGet();
611 return CompletableFuture.completedFuture(oper);
614 // pass it a null outcome
615 ControlLoopOperation outcome = oper.onSuccess(params, nextStep).apply(null).get();
616 assertNotNull(outcome);
617 assertEquals(PolicyResult.FAILURE.toString(), outcome.getOutcome());
618 assertEquals(0, count.get());
620 // pass it an unpopulated (i.e., failed) outcome
621 outcome = new ControlLoopOperation();
622 assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
623 assertEquals(0, count.get());
625 // pass it a successful outcome
626 outcome = params.makeOutcome();
627 outcome.setOutcome(PolicyResult.SUCCESS.toString());
628 assertSame(outcome, oper.onSuccess(params, nextStep).apply(outcome).get());
629 assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
630 assertEquals(1, count.get());
634 * Tests onSuccess() and handleFailure() when the outcome is a success.
637 public void testOnSuccessTrue_testHandleFailureTrue() {
638 // arrange to return a success from the preprocessor
639 oper.setPreProcessor(oper -> {
640 oper.setOutcome(PolicyResult.SUCCESS.toString());
641 return CompletableFuture.completedFuture(oper);
644 verifyRun("testOnSuccessTrue_testHandleFailureTrue", 1, 1, PolicyResult.SUCCESS);
648 * Tests onSuccess() and handleFailure() when the outcome is <i>not</i> a success.
651 public void testOnSuccessFalse_testHandleFailureFalse() throws Exception {
652 // arrange to return a failure from the preprocessor
653 oper.setPreProcessor(oper -> {
654 oper.setOutcome(PolicyResult.FAILURE.toString());
655 return CompletableFuture.completedFuture(oper);
658 verifyRun("testOnSuccessFalse_testHandleFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
662 * Tests onSuccess() and handleFailure() when the outcome is {@code null}.
665 public void testOnSuccessFalse_testHandleFailureNull() throws Exception {
666 // arrange to return null from the preprocessor
667 oper.setPreProcessor(oper -> {
668 return CompletableFuture.completedFuture(null);
671 verifyRun("testOnSuccessFalse_testHandleFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
675 public void testFromException() {
676 // arrange to generate an exception when operation runs
677 oper.setGenException(true);
679 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
683 * Tests fromException() when there is no exception.
686 public void testFromExceptionNoExcept() {
687 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
691 * Tests verifyRunning() when the pipeline is not running.
694 public void testVerifyRunningWhenNot() {
695 verifyRun("testVerifyRunningWhenNot", 0, 0, PolicyResult.SUCCESS, future -> future.cancel(false));
699 * Tests callbackStarted() when the pipeline has already been stopped.
702 public void testCallbackStartedNotRunning() {
703 AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
706 * arrange to stop the controller when the start-callback is invoked, but capture
709 params = params.toBuilder().startCallback(oper -> {
711 future.get().cancel(false);
714 future.set(oper.startOperation(params));
715 assertTrue(executor.runAll());
717 // should have only run once
718 assertEquals(1, numStart);
722 * Tests callbackCompleted() when the pipeline has already been stopped.
725 public void testCallbackCompletedNotRunning() {
726 AtomicReference<Future<ControlLoopOperation>> future = new AtomicReference<>();
728 // arrange to stop the controller when the start-callback is invoked
729 params = params.toBuilder().startCallback(oper -> {
730 future.get().cancel(false);
733 future.set(oper.startOperation(params));
734 assertTrue(executor.runAll());
736 // should not have been set
738 assertEquals(0, numEnd);
742 public void testSetOutcomeControlLoopOperationThrowable() {
743 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
745 ControlLoopOperation outcome;
747 outcome = new ControlLoopOperation();
748 oper.setOutcome(params, outcome, timex);
749 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
750 assertEquals(PolicyResult.FAILURE_TIMEOUT.toString(), outcome.getOutcome());
752 outcome = new ControlLoopOperation();
753 oper.setOutcome(params, outcome, new IllegalStateException());
754 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
755 assertEquals(PolicyResult.FAILURE_EXCEPTION.toString(), outcome.getOutcome());
759 public void testSetOutcomeControlLoopOperationPolicyResult() {
760 ControlLoopOperation outcome;
762 outcome = new ControlLoopOperation();
763 oper.setOutcome(params, outcome, PolicyResult.SUCCESS);
764 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
765 assertEquals(PolicyResult.SUCCESS.toString(), outcome.getOutcome());
767 for (PolicyResult result : FAILURE_RESULTS) {
768 outcome = new ControlLoopOperation();
769 oper.setOutcome(params, outcome, result);
770 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
771 assertEquals(result.toString(), result.toString(), outcome.getOutcome());
776 public void testIsTimeout() {
777 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
779 assertFalse(oper.isTimeout(new IllegalStateException()));
780 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
781 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
782 assertFalse(oper.isTimeout(new CompletionException(null)));
783 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
785 assertTrue(oper.isTimeout(timex));
786 assertTrue(oper.isTimeout(new CompletionException(timex)));
790 public void testGetTimeOutMillis() {
791 assertEquals(TIMEOUT * 1000, oper.getTimeOutMillis(policy));
793 policy.setTimeout(null);
794 assertEquals(0, oper.getTimeOutMillis(policy));
797 private void starter(ControlLoopOperation oper) {
799 tstart = oper.getStart();
803 private void completer(ControlLoopOperation oper) {
809 * Gets a function that does nothing.
811 * @param <T> type of input parameter expected by the function
812 * @return a function that does nothing
814 private <T> Consumer<T> noop() {
822 * @param testName test name
823 * @param expectedCallbacks number of callbacks expected
824 * @param expectedOperations number of operation invocations expected
825 * @param expectedResult expected outcome
827 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
828 PolicyResult expectedResult) {
830 String expectedSubRequestId =
831 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
833 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
839 * @param testName test name
840 * @param expectedCallbacks number of callbacks expected
841 * @param expectedOperations number of operation invocations expected
842 * @param expectedResult expected outcome
843 * @param manipulator function to modify the future returned by
844 * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
845 * the tasks in the executor are run
847 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
848 Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
850 String expectedSubRequestId =
851 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
853 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, manipulator);
859 * @param testName test name
860 * @param expectedCallbacks number of callbacks expected
861 * @param expectedOperations number of operation invocations expected
862 * @param expectedResult expected outcome
863 * @param expectedSubRequestId expected sub request ID
864 * @param manipulator function to modify the future returned by
865 * {@link OperatorPartial#startOperation(ControlLoopOperationParams)} before
866 * the tasks in the executor are run
868 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
869 String expectedSubRequestId, Consumer<CompletableFuture<ControlLoopOperation>> manipulator) {
871 CompletableFuture<ControlLoopOperation> future = oper.startOperation(params);
873 manipulator.accept(future);
875 assertTrue(testName, executor.runAll());
877 assertEquals(testName, expectedCallbacks, numStart);
878 assertEquals(testName, expectedCallbacks, numEnd);
880 if (expectedCallbacks > 0) {
881 assertNotNull(testName, opstart);
882 assertNotNull(testName, opend);
883 assertEquals(testName, expectedResult.toString(), opend.getOutcome());
885 assertSame(testName, tstart, opstart.getStart());
886 assertSame(testName, tstart, opend.getStart());
889 assertTrue(future.isDone());
890 assertSame(testName, opend, future.get());
892 } catch (InterruptedException | ExecutionException e) {
893 throw new IllegalStateException(e);
896 if (expectedOperations > 0) {
897 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
901 assertEquals(testName, expectedOperations, oper.getCount());
904 private static class MyOper extends OperatorPartial {
906 private int count = 0;
909 private boolean genException;
912 private int maxFailures = 0;
915 private Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> preProcessor;
918 super(ACTOR, OPERATOR);
922 protected ControlLoopOperation doOperation(ControlLoopOperationParams params, int attempt,
923 ControlLoopOperation operation) {
926 throw new IllegalStateException(EXPECTED_EXCEPTION);
929 operation.setSubRequestId(String.valueOf(attempt));
931 if (count > maxFailures) {
932 operation.setOutcome(PolicyResult.SUCCESS.toString());
934 operation.setOutcome(PolicyResult.FAILURE.toString());
941 protected Function<ControlLoopOperation, CompletableFuture<ControlLoopOperation>> doPreprocessorAsFuture(
942 ControlLoopOperationParams params) {
944 return (preProcessor != null ? preProcessor : super.doPreprocessorAsFuture(params));
949 * Executor that will run tasks until the queue is empty or a maximum number of tasks
950 * have been executed.
952 private static class MyExec implements Executor {
953 private static final int MAX_TASKS = MAX_PARALLEL_REQUESTS * 100;
955 private Queue<Runnable> commands = new LinkedList<>();
961 public int getQueueLength() {
962 return commands.size();
966 public void execute(Runnable command) {
967 commands.add(command);
970 public boolean runAll() {
971 for (int count = 0; count < MAX_TASKS && !commands.isEmpty(); ++count) {
972 commands.remove().run();
975 return commands.isEmpty();