2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
33 import ch.qos.logback.classic.Logger;
34 import java.time.Instant;
35 import java.util.Arrays;
36 import java.util.LinkedList;
37 import java.util.List;
39 import java.util.Map.Entry;
40 import java.util.UUID;
41 import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.CompletionException;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.ForkJoinPool;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import java.util.stream.Collectors;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
61 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
62 import org.onap.policy.common.utils.coder.Coder;
63 import org.onap.policy.common.utils.coder.CoderException;
64 import org.onap.policy.common.utils.coder.StandardCoder;
65 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
66 import org.onap.policy.common.utils.time.PseudoExecutor;
67 import org.onap.policy.controlloop.ControlLoopOperation;
68 import org.onap.policy.controlloop.VirtualControlLoopEvent;
69 import org.onap.policy.controlloop.actorserviceprovider.Operation;
70 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
71 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
72 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
73 import org.onap.policy.controlloop.policy.PolicyResult;
74 import org.slf4j.LoggerFactory;
76 public class OperationPartialTest {
77 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
78 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
79 private static final int MAX_REQUESTS = 100;
80 private static final int MAX_PARALLEL = 10;
81 private static final String EXPECTED_EXCEPTION = "expected exception";
82 private static final String ACTOR = "my-actor";
83 private static final String OPERATION = "my-operation";
84 private static final String MY_SINK = "my-sink";
85 private static final String MY_SOURCE = "my-source";
86 private static final String TEXT = "my-text";
87 private static final int TIMEOUT = 1000;
88 private static final UUID REQ_ID = UUID.randomUUID();
90 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
91 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
94 * Used to attach an appender to the class' logger.
96 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
97 private static final ExtractAppender appender = new ExtractAppender();
99 private VirtualControlLoopEvent event;
100 private ControlLoopEventContext context;
101 private PseudoExecutor executor;
102 private ControlLoopOperationParams params;
106 private int numStart;
109 private Instant tstart;
111 private OperationOutcome opstart;
112 private OperationOutcome opend;
114 private OperatorPartial operator;
117 * Attaches the appender to the logger.
120 public static void setUpBeforeClass() throws Exception {
122 * Attach appender to the logger.
124 appender.setContext(logger.getLoggerContext());
127 logger.addAppender(appender);
131 * Stops the appender.
134 public static void tearDownAfterClass() {
139 * Initializes the fields, including {@link #oper}.
142 public void setUp() {
143 event = new VirtualControlLoopEvent();
144 event.setRequestId(REQ_ID);
146 context = new ControlLoopEventContext(event);
147 executor = new PseudoExecutor();
149 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
150 .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
151 .startCallback(this::starter).targetEntity(MY_SINK).build();
153 operator = new OperatorPartial(ACTOR, OPERATION) {
155 public Executor getBlockingExecutor() {
160 public Operation buildOperation(ControlLoopOperationParams params) {
165 operator.configure(null);
177 public void testOperatorPartial_testGetActorName_testGetName() {
178 assertEquals(ACTOR, oper.getActorName());
179 assertEquals(OPERATION, oper.getName());
180 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
184 public void testGetBlockingThread() throws Exception {
185 CompletableFuture<Void> future = new CompletableFuture<>();
187 // use the real executor
188 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
190 public Operation buildOperation(ControlLoopOperationParams params) {
195 oper2.getBlockingExecutor().execute(() -> future.complete(null));
197 assertNull(future.get(5, TimeUnit.SECONDS));
201 * Exercises the doXxx() methods.
204 public void testDoXxx() {
205 assertThatCode(() -> operator.doConfigure(null)).doesNotThrowAnyException();
206 assertThatCode(() -> operator.doStart()).doesNotThrowAnyException();
207 assertThatCode(() -> operator.doStop()).doesNotThrowAnyException();
208 assertThatCode(() -> operator.doShutdown()).doesNotThrowAnyException();
213 public void testStart() {
214 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
218 * Tests startOperation() when the operator is not running.
221 public void testStartNotRunning() {
225 assertThatIllegalStateException().isThrownBy(() -> oper.start());
229 * Tests startOperation() when the operation has a preprocessor.
232 public void testStartWithPreprocessor() {
233 AtomicInteger count = new AtomicInteger();
235 CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
236 count.incrementAndGet();
237 return makeSuccess();
240 oper.setGuard(preproc);
242 verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
244 assertEquals(1, count.get());
248 * Tests start() with multiple running requests.
251 public void testStartMultiple() {
252 for (int count = 0; count < MAX_PARALLEL; ++count) {
256 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
258 assertNotNull(opstart);
259 assertNotNull(opend);
260 assertEquals(PolicyResult.SUCCESS, opend.getResult());
262 assertEquals(MAX_PARALLEL, numStart);
263 assertEquals(MAX_PARALLEL, oper.getCount());
264 assertEquals(MAX_PARALLEL, numEnd);
268 * Tests startPreprocessor() when the preprocessor returns a failure.
271 public void testStartPreprocessorFailure() {
272 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
274 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
278 * Tests startPreprocessor() when the preprocessor throws an exception.
281 public void testStartPreprocessorException() {
282 // arrange for the preprocessor to throw an exception
283 oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
285 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
289 * Tests startPreprocessor() when the pipeline is not running.
292 public void testStartPreprocessorNotRunning() {
293 // arrange for the preprocessor to return success, which will be ignored
294 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
296 oper.start().cancel(false);
297 assertTrue(executor.runAll(MAX_REQUESTS));
302 assertEquals(0, numStart);
303 assertEquals(0, oper.getCount());
304 assertEquals(0, numEnd);
308 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
311 public void testStartPreprocessorBuilderException() {
312 oper = new MyOper() {
314 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
315 throw new IllegalStateException(EXPECTED_EXCEPTION);
319 assertThatIllegalStateException().isThrownBy(() -> oper.start());
321 // should be nothing in the queue
322 assertEquals(0, executor.getQueueLength());
326 public void testStartPreprocessorAsync() {
327 assertNull(oper.startPreprocessorAsync());
331 public void testStartGuardAsync() {
332 assertNull(oper.startGuardAsync());
336 public void testStartOperationAsync() {
338 assertTrue(executor.runAll(MAX_REQUESTS));
340 assertEquals(1, oper.getCount());
344 public void testIsSuccess() {
345 OperationOutcome outcome = new OperationOutcome();
347 outcome.setResult(PolicyResult.SUCCESS);
348 assertTrue(oper.isSuccess(outcome));
350 for (PolicyResult failure : FAILURE_RESULTS) {
351 outcome.setResult(failure);
352 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
357 public void testIsActorFailed() {
358 assertFalse(oper.isActorFailed(null));
360 OperationOutcome outcome = params.makeOutcome();
363 outcome.setResult(PolicyResult.SUCCESS);
364 assertFalse(oper.isActorFailed(outcome));
366 outcome.setResult(PolicyResult.FAILURE_RETRIES);
367 assertFalse(oper.isActorFailed(outcome));
370 outcome.setResult(PolicyResult.FAILURE);
373 outcome.setActor(MY_SINK);
374 assertFalse(oper.isActorFailed(outcome));
375 outcome.setActor(null);
376 assertFalse(oper.isActorFailed(outcome));
377 outcome.setActor(ACTOR);
379 // incorrect operation
380 outcome.setOperation(MY_SINK);
381 assertFalse(oper.isActorFailed(outcome));
382 outcome.setOperation(null);
383 assertFalse(oper.isActorFailed(outcome));
384 outcome.setOperation(OPERATION);
387 assertTrue(oper.isActorFailed(outcome));
391 public void testDoOperation() {
393 * Use an operation that doesn't override doOperation().
395 OperationPartial oper2 = new OperationPartial(params, operator) {};
398 assertTrue(executor.runAll(MAX_REQUESTS));
400 assertNotNull(opend);
401 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
405 public void testTimeout() throws Exception {
407 // use a real executor
408 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
410 // trigger timeout very quickly
411 oper = new MyOper() {
413 protected long getTimeoutMs(Integer timeoutSec) {
418 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
420 OperationOutcome outcome2 = params.makeOutcome();
421 outcome2.setResult(PolicyResult.SUCCESS);
424 * Create an incomplete future that will timeout after the operation's
425 * timeout. If it fires before the other timer, then it will return a
428 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
429 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
430 params.getExecutor());
436 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
440 * Tests retry functions, when the count is set to zero and retries are exhausted.
443 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
444 params = params.toBuilder().retry(0).build();
446 // new params, thus need a new operation
449 oper.setMaxFailures(10);
451 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
455 * Tests retry functions, when the count is null and retries are exhausted.
458 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
459 params = params.toBuilder().retry(null).build();
461 // new params, thus need a new operation
464 oper.setMaxFailures(10);
466 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
470 * Tests retry functions, when retries are exhausted.
473 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
474 final int maxRetries = 3;
475 params = params.toBuilder().retry(maxRetries).build();
477 // new params, thus need a new operation
480 oper.setMaxFailures(10);
482 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
483 PolicyResult.FAILURE_RETRIES);
487 * Tests retry functions, when a success follows some retries.
490 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
491 params = params.toBuilder().retry(10).build();
493 // new params, thus need a new operation
496 final int maxFailures = 3;
497 oper.setMaxFailures(maxFailures);
499 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
500 PolicyResult.SUCCESS);
504 * Tests retry functions, when the outcome is {@code null}.
507 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
509 // arrange to return null from doOperation()
510 oper = new MyOper() {
512 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
515 super.doOperation(attempt, operation);
520 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
524 public void testSleep() throws Exception {
525 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
526 assertTrue(future.isDone());
527 assertNull(future.get());
530 future = oper.sleep(0, TimeUnit.SECONDS);
531 assertTrue(future.isDone());
532 assertNull(future.get());
535 * Start a second sleep we can use to check the first while it's running.
537 tstart = Instant.now();
538 future = oper.sleep(100, TimeUnit.MILLISECONDS);
540 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
542 // wait for second to complete and verify that the first has not completed
544 assertFalse(future.isDone());
546 // wait for second to complete
549 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
550 assertTrue(diff >= 99);
554 public void testIsSameOperation() {
555 assertFalse(oper.isSameOperation(null));
557 OperationOutcome outcome = params.makeOutcome();
559 // wrong actor - should be false
560 outcome.setActor(null);
561 assertFalse(oper.isSameOperation(outcome));
562 outcome.setActor(MY_SINK);
563 assertFalse(oper.isSameOperation(outcome));
564 outcome.setActor(ACTOR);
566 // wrong operation - should be null
567 outcome.setOperation(null);
568 assertFalse(oper.isSameOperation(outcome));
569 outcome.setOperation(MY_SINK);
570 assertFalse(oper.isSameOperation(outcome));
571 outcome.setOperation(OPERATION);
573 assertTrue(oper.isSameOperation(outcome));
577 * Tests handleFailure() when the outcome is a success.
580 public void testHandlePreprocessorFailureTrue() {
581 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
582 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
586 * Tests handleFailure() when the outcome is <i>not</i> a success.
589 public void testHandlePreprocessorFailureFalse() throws Exception {
590 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
591 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
595 * Tests handleFailure() when the outcome is {@code null}.
598 public void testHandlePreprocessorFailureNull() throws Exception {
599 // arrange to return null from the preprocessor
600 oper.setGuard(CompletableFuture.completedFuture(null));
602 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
606 public void testFromException() {
607 // arrange to generate an exception when operation runs
608 oper.setGenException(true);
610 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
614 * Tests fromException() when there is no exception.
617 public void testFromExceptionNoExcept() {
618 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
622 * Tests both flavors of anyOf(), because one invokes the other.
625 public void testAnyOf() throws Exception {
626 // first task completes, others do not
627 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
629 final OperationOutcome outcome = params.makeOutcome();
631 tasks.add(() -> CompletableFuture.completedFuture(outcome));
632 tasks.add(() -> new CompletableFuture<>());
633 tasks.add(() -> null);
634 tasks.add(() -> new CompletableFuture<>());
636 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
637 assertTrue(executor.runAll(MAX_REQUESTS));
638 assertTrue(result.isDone());
639 assertSame(outcome, result.get());
641 // repeat using array form
642 @SuppressWarnings("unchecked")
643 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
644 result = oper.anyOf(tasks.toArray(taskArray));
645 assertTrue(executor.runAll(MAX_REQUESTS));
646 assertTrue(result.isDone());
647 assertSame(outcome, result.get());
649 // second task completes, others do not
651 tasks.add(() -> new CompletableFuture<>());
652 tasks.add(() -> CompletableFuture.completedFuture(outcome));
653 tasks.add(() -> new CompletableFuture<>());
655 result = oper.anyOf(tasks);
656 assertTrue(executor.runAll(MAX_REQUESTS));
657 assertTrue(result.isDone());
658 assertSame(outcome, result.get());
660 // third task completes, others do not
662 tasks.add(() -> new CompletableFuture<>());
663 tasks.add(() -> new CompletableFuture<>());
664 tasks.add(() -> CompletableFuture.completedFuture(outcome));
666 result = oper.anyOf(tasks);
667 assertTrue(executor.runAll(MAX_REQUESTS));
668 assertTrue(result.isDone());
669 assertSame(outcome, result.get());
673 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
676 @SuppressWarnings("unchecked")
677 public void testAnyOfEdge() throws Exception {
678 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
680 // zero items: check both using a list and using an array
681 assertNull(oper.anyOf(tasks));
682 assertNull(oper.anyOf());
684 // one item: : check both using a list and using an array
685 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
686 tasks.add(() -> future1);
688 assertSame(future1, oper.anyOf(tasks));
689 assertSame(future1, oper.anyOf(() -> future1));
693 public void testAllOfArray() throws Exception {
694 final OperationOutcome outcome = params.makeOutcome();
696 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
697 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
698 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
700 @SuppressWarnings("unchecked")
701 CompletableFuture<OperationOutcome> result =
702 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
704 assertTrue(executor.runAll(MAX_REQUESTS));
705 assertFalse(result.isDone());
706 future1.complete(outcome);
708 // complete 3 before 2
709 assertTrue(executor.runAll(MAX_REQUESTS));
710 assertFalse(result.isDone());
711 future3.complete(outcome);
713 assertTrue(executor.runAll(MAX_REQUESTS));
714 assertFalse(result.isDone());
715 future2.complete(outcome);
717 // all of them are now done
718 assertTrue(executor.runAll(MAX_REQUESTS));
719 assertTrue(result.isDone());
720 assertSame(outcome, result.get());
724 public void testAllOfList() throws Exception {
725 final OperationOutcome outcome = params.makeOutcome();
727 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
728 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
729 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
731 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
732 tasks.add(() -> future1);
733 tasks.add(() -> future2);
734 tasks.add(() -> null);
735 tasks.add(() -> future3);
737 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
739 assertTrue(executor.runAll(MAX_REQUESTS));
740 assertFalse(result.isDone());
741 future1.complete(outcome);
743 // complete 3 before 2
744 assertTrue(executor.runAll(MAX_REQUESTS));
745 assertFalse(result.isDone());
746 future3.complete(outcome);
748 assertTrue(executor.runAll(MAX_REQUESTS));
749 assertFalse(result.isDone());
750 future2.complete(outcome);
752 // all of them are now done
753 assertTrue(executor.runAll(MAX_REQUESTS));
754 assertTrue(result.isDone());
755 assertSame(outcome, result.get());
759 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
762 @SuppressWarnings("unchecked")
763 public void testAllOfEdge() throws Exception {
764 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
766 // zero items: check both using a list and using an array
767 assertNull(oper.allOf(tasks));
768 assertNull(oper.allOf());
770 // one item: : check both using a list and using an array
771 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
772 tasks.add(() -> future1);
774 assertSame(future1, oper.allOf(tasks));
775 assertSame(future1, oper.allOf(() -> future1));
779 public void testAttachFutures() throws Exception {
780 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
782 // third task throws an exception during construction
783 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
784 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
785 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
786 tasks.add(() -> future1);
787 tasks.add(() -> future2);
789 throw new IllegalStateException(EXPECTED_EXCEPTION);
791 tasks.add(() -> future3);
793 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
795 // should have canceled the first two, but not the last
796 assertTrue(future1.isCancelled());
797 assertTrue(future2.isCancelled());
798 assertFalse(future3.isCancelled());
802 public void testCombineOutcomes() throws Exception {
804 verifyOutcomes(0, PolicyResult.SUCCESS);
805 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
807 // maximum is in different positions
808 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
809 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
810 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
812 // null outcome - takes precedence over a success
813 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
814 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
815 tasks.add(() -> CompletableFuture.completedFuture(null));
816 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
817 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
819 assertTrue(executor.runAll(MAX_REQUESTS));
820 assertTrue(result.isDone());
821 assertNull(result.get());
823 // one throws an exception during execution
824 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
827 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
828 tasks.add(() -> CompletableFuture.failedFuture(except));
829 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
830 result = oper.allOf(tasks);
832 assertTrue(executor.runAll(MAX_REQUESTS));
833 assertTrue(result.isCompletedExceptionally());
834 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
838 * Tests both flavors of sequence(), because one invokes the other.
841 public void testSequence() throws Exception {
842 final OperationOutcome outcome = params.makeOutcome();
844 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
845 tasks.add(() -> CompletableFuture.completedFuture(outcome));
846 tasks.add(() -> null);
847 tasks.add(() -> CompletableFuture.completedFuture(outcome));
848 tasks.add(() -> CompletableFuture.completedFuture(outcome));
850 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
851 assertTrue(executor.runAll(MAX_REQUESTS));
852 assertTrue(result.isDone());
853 assertSame(outcome, result.get());
855 // repeat using array form
856 @SuppressWarnings("unchecked")
857 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
858 result = oper.sequence(tasks.toArray(taskArray));
859 assertTrue(executor.runAll(MAX_REQUESTS));
860 assertTrue(result.isDone());
861 assertSame(outcome, result.get());
863 // second task fails, third should not run
864 OperationOutcome failure = params.makeOutcome();
865 failure.setResult(PolicyResult.FAILURE);
867 tasks.add(() -> CompletableFuture.completedFuture(outcome));
868 tasks.add(() -> CompletableFuture.completedFuture(failure));
869 tasks.add(() -> CompletableFuture.completedFuture(outcome));
871 result = oper.sequence(tasks);
872 assertTrue(executor.runAll(MAX_REQUESTS));
873 assertTrue(result.isDone());
874 assertSame(failure, result.get());
878 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
881 @SuppressWarnings("unchecked")
882 public void testSequenceEdge() throws Exception {
883 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
885 // zero items: check both using a list and using an array
886 assertNull(oper.sequence(tasks));
887 assertNull(oper.sequence());
889 // one item: : check both using a list and using an array
890 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
891 tasks.add(() -> future1);
893 assertSame(future1, oper.sequence(tasks));
894 assertSame(future1, oper.sequence(() -> future1));
897 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
898 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
900 OperationOutcome expectedOutcome = null;
902 for (int count = 0; count < results.length; ++count) {
903 OperationOutcome outcome = params.makeOutcome();
904 outcome.setResult(results[count]);
905 tasks.add(() -> CompletableFuture.completedFuture(outcome));
907 if (count == expected) {
908 expectedOutcome = outcome;
912 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
914 assertTrue(executor.runAll(MAX_REQUESTS));
915 assertTrue(result.isDone());
916 assertSame(expectedOutcome, result.get());
920 public void testDetmPriority() throws CoderException {
921 assertEquals(1, oper.detmPriority(null));
923 OperationOutcome outcome = params.makeOutcome();
925 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
926 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
927 PolicyResult.FAILURE_EXCEPTION, 6);
929 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
930 outcome.setResult(ent.getKey());
931 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
935 * Test null result. We can't actually set it to null, because the set() method
936 * won't allow it. Instead, we decode it from a structure.
938 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
939 assertEquals(1, oper.detmPriority(outcome));
943 * Tests callbackStarted() when the pipeline has already been stopped.
946 public void testCallbackStartedNotRunning() {
947 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
950 * arrange to stop the controller when the start-callback is invoked, but capture
953 params = params.toBuilder().startCallback(oper -> {
955 future.get().cancel(false);
958 // new params, thus need a new operation
961 future.set(oper.start());
962 assertTrue(executor.runAll(MAX_REQUESTS));
964 // should have only run once
965 assertEquals(1, numStart);
969 * Tests callbackCompleted() when the pipeline has already been stopped.
972 public void testCallbackCompletedNotRunning() {
973 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
975 // arrange to stop the controller when the start-callback is invoked
976 params = params.toBuilder().startCallback(oper -> {
977 future.get().cancel(false);
980 // new params, thus need a new operation
983 future.set(oper.start());
984 assertTrue(executor.runAll(MAX_REQUESTS));
986 // should not have been set
988 assertEquals(0, numEnd);
992 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
993 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
995 OperationOutcome outcome;
997 outcome = new OperationOutcome();
998 oper.setOutcome(outcome, timex);
999 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1000 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1002 outcome = new OperationOutcome();
1003 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1004 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1005 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1009 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1010 OperationOutcome outcome;
1012 outcome = new OperationOutcome();
1013 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1014 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1015 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1017 for (PolicyResult result : FAILURE_RESULTS) {
1018 outcome = new OperationOutcome();
1019 oper.setOutcome(outcome, result);
1020 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1021 assertEquals(result.toString(), result, outcome.getResult());
1026 public void testIsTimeout() {
1027 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1029 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1030 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1031 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1032 assertFalse(oper.isTimeout(new CompletionException(null)));
1033 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1035 assertTrue(oper.isTimeout(timex));
1036 assertTrue(oper.isTimeout(new CompletionException(timex)));
1040 public void testLogMessage() {
1041 final String infraStr = SINK_INFRA.toString();
1043 // log structured data
1044 appender.clearExtractions();
1045 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1046 List<String> output = appender.getExtracted();
1047 assertEquals(1, output.size());
1049 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1050 .contains("{\n \"text\": \"my-text\"\n}");
1052 // repeat with a response
1053 appender.clearExtractions();
1054 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1055 output = appender.getExtracted();
1056 assertEquals(1, output.size());
1058 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1059 .contains("{\n \"text\": \"my-text\"\n}");
1061 // log a plain string
1062 appender.clearExtractions();
1063 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1064 output = appender.getExtracted();
1065 assertEquals(1, output.size());
1066 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1068 // log a null request
1069 appender.clearExtractions();
1070 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1071 output = appender.getExtracted();
1072 assertEquals(1, output.size());
1074 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1076 // generate exception from coder
1077 setOperCoderException();
1079 appender.clearExtractions();
1080 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1081 output = appender.getExtracted();
1082 assertEquals(2, output.size());
1083 assertThat(output.get(0)).contains("cannot pretty-print request");
1084 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1086 // repeat with a response
1087 appender.clearExtractions();
1088 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1089 output = appender.getExtracted();
1090 assertEquals(2, output.size());
1091 assertThat(output.get(0)).contains("cannot pretty-print response");
1092 assertThat(output.get(1)).contains(MY_SOURCE);
1096 public void testGetRetry() {
1097 assertEquals(0, oper.getRetry(null));
1098 assertEquals(10, oper.getRetry(10));
1102 public void testGetRetryWait() {
1103 // need an operator that doesn't override the retry time
1104 OperationPartial oper2 = new OperationPartial(params, operator) {};
1105 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1109 public void testGetTimeOutMs() {
1110 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1112 params = params.toBuilder().timeoutSec(null).build();
1114 // new params, thus need a new operation
1115 oper = new MyOper();
1117 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1120 private void starter(OperationOutcome oper) {
1122 tstart = oper.getStart();
1126 private void completer(OperationOutcome oper) {
1132 * Gets a function that does nothing.
1134 * @param <T> type of input parameter expected by the function
1135 * @return a function that does nothing
1137 private <T> Consumer<T> noop() {
1142 private OperationOutcome makeSuccess() {
1143 OperationOutcome outcome = params.makeOutcome();
1144 outcome.setResult(PolicyResult.SUCCESS);
1149 private OperationOutcome makeFailure() {
1150 OperationOutcome outcome = params.makeOutcome();
1151 outcome.setResult(PolicyResult.FAILURE);
1159 * @param testName test name
1160 * @param expectedCallbacks number of callbacks expected
1161 * @param expectedOperations number of operation invocations expected
1162 * @param expectedResult expected outcome
1164 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1165 PolicyResult expectedResult) {
1167 String expectedSubRequestId =
1168 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1170 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1176 * @param testName test name
1177 * @param expectedCallbacks number of callbacks expected
1178 * @param expectedOperations number of operation invocations expected
1179 * @param expectedResult expected outcome
1180 * @param expectedSubRequestId expected sub request ID
1181 * @param manipulator function to modify the future returned by
1182 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1183 * in the executor are run
1185 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1186 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1188 CompletableFuture<OperationOutcome> future = oper.start();
1190 manipulator.accept(future);
1192 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1194 assertEquals(testName, expectedCallbacks, numStart);
1195 assertEquals(testName, expectedCallbacks, numEnd);
1197 if (expectedCallbacks > 0) {
1198 assertNotNull(testName, opstart);
1199 assertNotNull(testName, opend);
1200 assertEquals(testName, expectedResult, opend.getResult());
1202 assertSame(testName, tstart, opstart.getStart());
1203 assertSame(testName, tstart, opend.getStart());
1206 assertTrue(future.isDone());
1207 assertSame(testName, opend, future.get());
1209 } catch (InterruptedException | ExecutionException e) {
1210 throw new IllegalStateException(e);
1213 if (expectedOperations > 0) {
1214 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1218 assertEquals(testName, expectedOperations, oper.getCount());
1222 * Creates a new {@link #oper} whose coder will throw an exception.
1224 private void setOperCoderException() {
1225 oper = new MyOper() {
1227 protected Coder makeCoder() {
1228 return new StandardCoder() {
1230 public String encode(Object object, boolean pretty) throws CoderException {
1231 throw new CoderException(EXPECTED_EXCEPTION);
1240 public static class MyData {
1241 private String text = TEXT;
1245 private class MyOper extends OperationPartial {
1247 private int count = 0;
1250 private boolean genException;
1253 private int maxFailures = 0;
1256 private CompletableFuture<OperationOutcome> guard;
1260 super(OperationPartialTest.this.params, operator);
1264 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1267 throw new IllegalStateException(EXPECTED_EXCEPTION);
1270 operation.setSubRequestId(String.valueOf(attempt));
1272 if (count > maxFailures) {
1273 operation.setResult(PolicyResult.SUCCESS);
1275 operation.setResult(PolicyResult.FAILURE);
1282 protected CompletableFuture<OperationOutcome> startGuardAsync() {
1283 return (guard != null ? guard : super.startGuardAsync());
1287 protected long getRetryWaitMs() {
1289 * Sleep timers run in the background, but we want to control things via the
1290 * "executor", thus we avoid sleep timers altogether by simply returning 0.