2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
32 import ch.qos.logback.classic.Logger;
33 import java.time.Instant;
34 import java.util.Arrays;
35 import java.util.LinkedList;
36 import java.util.List;
38 import java.util.Map.Entry;
39 import java.util.UUID;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.CompletionException;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ForkJoinPool;
44 import java.util.concurrent.Future;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.function.Consumer;
50 import java.util.function.Supplier;
51 import java.util.stream.Collectors;
54 import org.junit.AfterClass;
55 import org.junit.Before;
56 import org.junit.BeforeClass;
57 import org.junit.Test;
58 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
59 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
60 import org.onap.policy.common.utils.coder.Coder;
61 import org.onap.policy.common.utils.coder.CoderException;
62 import org.onap.policy.common.utils.coder.StandardCoder;
63 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
64 import org.onap.policy.common.utils.time.PseudoExecutor;
65 import org.onap.policy.controlloop.ControlLoopOperation;
66 import org.onap.policy.controlloop.VirtualControlLoopEvent;
67 import org.onap.policy.controlloop.actorserviceprovider.Operation;
68 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
69 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
70 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
71 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
72 import org.onap.policy.controlloop.policy.PolicyResult;
73 import org.slf4j.LoggerFactory;
75 public class OperationPartialTest {
76 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
77 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
78 private static final int MAX_REQUESTS = 100;
79 private static final int MAX_PARALLEL = 10;
80 private static final String EXPECTED_EXCEPTION = "expected exception";
81 private static final String ACTOR = "my-actor";
82 private static final String OPERATION = "my-operation";
83 private static final String MY_SINK = "my-sink";
84 private static final String MY_SOURCE = "my-source";
85 private static final String TEXT = "my-text";
86 private static final int TIMEOUT = 1000;
87 private static final UUID REQ_ID = UUID.randomUUID();
89 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
90 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
93 * Used to attach an appender to the class' logger.
95 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
96 private static final ExtractAppender appender = new ExtractAppender();
98 private VirtualControlLoopEvent event;
99 private ControlLoopEventContext context;
100 private PseudoExecutor executor;
101 private ControlLoopOperationParams params;
105 private int numStart;
108 private Instant tstart;
110 private OperationOutcome opstart;
111 private OperationOutcome opend;
113 private OperatorConfig config;
116 * Attaches the appender to the logger.
119 public static void setUpBeforeClass() throws Exception {
121 * Attach appender to the logger.
123 appender.setContext(logger.getLoggerContext());
126 logger.addAppender(appender);
130 * Stops the appender.
133 public static void tearDownAfterClass() {
138 * Initializes the fields, including {@link #oper}.
141 public void setUp() {
142 event = new VirtualControlLoopEvent();
143 event.setRequestId(REQ_ID);
145 context = new ControlLoopEventContext(event);
146 executor = new PseudoExecutor();
148 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
149 .executor(executor).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
150 .startCallback(this::starter).targetEntity(MY_SINK).build();
152 config = new OperatorConfig(executor);
163 public void testOperatorPartial_testGetActorName_testGetName() {
164 assertEquals(ACTOR, oper.getActorName());
165 assertEquals(OPERATION, oper.getName());
166 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
170 public void testGetBlockingThread() throws Exception {
171 CompletableFuture<Void> future = new CompletableFuture<>();
173 // use the real executor
174 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
176 public Operation buildOperation(ControlLoopOperationParams params) {
181 oper2.getBlockingExecutor().execute(() -> future.complete(null));
183 assertNull(future.get(5, TimeUnit.SECONDS));
187 public void testStart() {
188 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
192 * Tests startOperation() when the operation has a preprocessor.
195 public void testStartWithPreprocessor() {
196 AtomicInteger count = new AtomicInteger();
198 CompletableFuture<OperationOutcome> preproc = CompletableFuture.supplyAsync(() -> {
199 count.incrementAndGet();
200 return makeSuccess();
203 oper.setGuard(preproc);
205 verifyRun("testStartWithPreprocessor_testStartPreprocessor", 1, 1, PolicyResult.SUCCESS);
207 assertEquals(1, count.get());
211 * Tests start() with multiple running requests.
214 public void testStartMultiple() {
215 for (int count = 0; count < MAX_PARALLEL; ++count) {
219 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
221 assertNotNull(opstart);
222 assertNotNull(opend);
223 assertEquals(PolicyResult.SUCCESS, opend.getResult());
225 assertEquals(MAX_PARALLEL, numStart);
226 assertEquals(MAX_PARALLEL, oper.getCount());
227 assertEquals(MAX_PARALLEL, numEnd);
231 * Tests startPreprocessor() when the preprocessor returns a failure.
234 public void testStartPreprocessorFailure() {
235 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
237 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
241 * Tests startPreprocessor() when the preprocessor throws an exception.
244 public void testStartPreprocessorException() {
245 // arrange for the preprocessor to throw an exception
246 oper.setGuard(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
248 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
252 * Tests startPreprocessor() when the pipeline is not running.
255 public void testStartPreprocessorNotRunning() {
256 // arrange for the preprocessor to return success, which will be ignored
257 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
259 oper.start().cancel(false);
260 assertTrue(executor.runAll(MAX_REQUESTS));
265 assertEquals(0, numStart);
266 assertEquals(0, oper.getCount());
267 assertEquals(0, numEnd);
271 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
274 public void testStartPreprocessorBuilderException() {
275 oper = new MyOper() {
277 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
278 throw new IllegalStateException(EXPECTED_EXCEPTION);
282 assertThatIllegalStateException().isThrownBy(() -> oper.start());
284 // should be nothing in the queue
285 assertEquals(0, executor.getQueueLength());
289 public void testStartPreprocessorAsync() {
290 assertNull(oper.startPreprocessorAsync());
294 public void testStartGuardAsync() {
295 assertNull(oper.startGuardAsync());
299 public void testStartOperationAsync() {
301 assertTrue(executor.runAll(MAX_REQUESTS));
303 assertEquals(1, oper.getCount());
307 public void testIsSuccess() {
308 OperationOutcome outcome = new OperationOutcome();
310 outcome.setResult(PolicyResult.SUCCESS);
311 assertTrue(oper.isSuccess(outcome));
313 for (PolicyResult failure : FAILURE_RESULTS) {
314 outcome.setResult(failure);
315 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
320 public void testIsActorFailed() {
321 assertFalse(oper.isActorFailed(null));
323 OperationOutcome outcome = params.makeOutcome();
326 outcome.setResult(PolicyResult.SUCCESS);
327 assertFalse(oper.isActorFailed(outcome));
329 outcome.setResult(PolicyResult.FAILURE_RETRIES);
330 assertFalse(oper.isActorFailed(outcome));
333 outcome.setResult(PolicyResult.FAILURE);
336 outcome.setActor(MY_SINK);
337 assertFalse(oper.isActorFailed(outcome));
338 outcome.setActor(null);
339 assertFalse(oper.isActorFailed(outcome));
340 outcome.setActor(ACTOR);
342 // incorrect operation
343 outcome.setOperation(MY_SINK);
344 assertFalse(oper.isActorFailed(outcome));
345 outcome.setOperation(null);
346 assertFalse(oper.isActorFailed(outcome));
347 outcome.setOperation(OPERATION);
350 assertTrue(oper.isActorFailed(outcome));
354 public void testDoOperation() {
356 * Use an operation that doesn't override doOperation().
358 OperationPartial oper2 = new OperationPartial(params, config) {};
361 assertTrue(executor.runAll(MAX_REQUESTS));
363 assertNotNull(opend);
364 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
368 public void testTimeout() throws Exception {
370 // use a real executor
371 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
373 // trigger timeout very quickly
374 oper = new MyOper() {
376 protected long getTimeoutMs(Integer timeoutSec) {
381 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
383 OperationOutcome outcome2 = params.makeOutcome();
384 outcome2.setResult(PolicyResult.SUCCESS);
387 * Create an incomplete future that will timeout after the operation's
388 * timeout. If it fires before the other timer, then it will return a
391 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
392 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
393 params.getExecutor());
399 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
403 * Tests retry functions, when the count is set to zero and retries are exhausted.
406 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
407 params = params.toBuilder().retry(0).build();
409 // new params, thus need a new operation
412 oper.setMaxFailures(10);
414 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
418 * Tests retry functions, when the count is null and retries are exhausted.
421 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
422 params = params.toBuilder().retry(null).build();
424 // new params, thus need a new operation
427 oper.setMaxFailures(10);
429 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
433 * Tests retry functions, when retries are exhausted.
436 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
437 final int maxRetries = 3;
438 params = params.toBuilder().retry(maxRetries).build();
440 // new params, thus need a new operation
443 oper.setMaxFailures(10);
445 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
446 PolicyResult.FAILURE_RETRIES);
450 * Tests retry functions, when a success follows some retries.
453 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
454 params = params.toBuilder().retry(10).build();
456 // new params, thus need a new operation
459 final int maxFailures = 3;
460 oper.setMaxFailures(maxFailures);
462 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
463 PolicyResult.SUCCESS);
467 * Tests retry functions, when the outcome is {@code null}.
470 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
472 // arrange to return null from doOperation()
473 oper = new MyOper() {
475 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
478 super.doOperation(attempt, operation);
483 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
487 public void testSleep() throws Exception {
488 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
489 assertTrue(future.isDone());
490 assertNull(future.get());
493 future = oper.sleep(0, TimeUnit.SECONDS);
494 assertTrue(future.isDone());
495 assertNull(future.get());
498 * Start a second sleep we can use to check the first while it's running.
500 tstart = Instant.now();
501 future = oper.sleep(100, TimeUnit.MILLISECONDS);
503 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
505 // wait for second to complete and verify that the first has not completed
507 assertFalse(future.isDone());
509 // wait for second to complete
512 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
513 assertTrue(diff >= 99);
517 public void testIsSameOperation() {
518 assertFalse(oper.isSameOperation(null));
520 OperationOutcome outcome = params.makeOutcome();
522 // wrong actor - should be false
523 outcome.setActor(null);
524 assertFalse(oper.isSameOperation(outcome));
525 outcome.setActor(MY_SINK);
526 assertFalse(oper.isSameOperation(outcome));
527 outcome.setActor(ACTOR);
529 // wrong operation - should be null
530 outcome.setOperation(null);
531 assertFalse(oper.isSameOperation(outcome));
532 outcome.setOperation(MY_SINK);
533 assertFalse(oper.isSameOperation(outcome));
534 outcome.setOperation(OPERATION);
536 assertTrue(oper.isSameOperation(outcome));
540 * Tests handleFailure() when the outcome is a success.
543 public void testHandlePreprocessorFailureTrue() {
544 oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
545 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
549 * Tests handleFailure() when the outcome is <i>not</i> a success.
552 public void testHandlePreprocessorFailureFalse() throws Exception {
553 oper.setGuard(CompletableFuture.completedFuture(makeFailure()));
554 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
558 * Tests handleFailure() when the outcome is {@code null}.
561 public void testHandlePreprocessorFailureNull() throws Exception {
562 // arrange to return null from the preprocessor
563 oper.setGuard(CompletableFuture.completedFuture(null));
565 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
569 public void testFromException() {
570 // arrange to generate an exception when operation runs
571 oper.setGenException(true);
573 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
577 * Tests fromException() when there is no exception.
580 public void testFromExceptionNoExcept() {
581 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
585 * Tests both flavors of anyOf(), because one invokes the other.
588 public void testAnyOf() throws Exception {
589 // first task completes, others do not
590 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
592 final OperationOutcome outcome = params.makeOutcome();
594 tasks.add(() -> CompletableFuture.completedFuture(outcome));
595 tasks.add(() -> new CompletableFuture<>());
596 tasks.add(() -> null);
597 tasks.add(() -> new CompletableFuture<>());
599 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
600 assertTrue(executor.runAll(MAX_REQUESTS));
601 assertTrue(result.isDone());
602 assertSame(outcome, result.get());
604 // repeat using array form
605 @SuppressWarnings("unchecked")
606 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
607 result = oper.anyOf(tasks.toArray(taskArray));
608 assertTrue(executor.runAll(MAX_REQUESTS));
609 assertTrue(result.isDone());
610 assertSame(outcome, result.get());
612 // second task completes, others do not
614 tasks.add(() -> new CompletableFuture<>());
615 tasks.add(() -> CompletableFuture.completedFuture(outcome));
616 tasks.add(() -> new CompletableFuture<>());
618 result = oper.anyOf(tasks);
619 assertTrue(executor.runAll(MAX_REQUESTS));
620 assertTrue(result.isDone());
621 assertSame(outcome, result.get());
623 // third task completes, others do not
625 tasks.add(() -> new CompletableFuture<>());
626 tasks.add(() -> new CompletableFuture<>());
627 tasks.add(() -> CompletableFuture.completedFuture(outcome));
629 result = oper.anyOf(tasks);
630 assertTrue(executor.runAll(MAX_REQUESTS));
631 assertTrue(result.isDone());
632 assertSame(outcome, result.get());
636 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
639 @SuppressWarnings("unchecked")
640 public void testAnyOfEdge() throws Exception {
641 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
643 // zero items: check both using a list and using an array
644 assertNull(oper.anyOf(tasks));
645 assertNull(oper.anyOf());
647 // one item: : check both using a list and using an array
648 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
649 tasks.add(() -> future1);
651 assertSame(future1, oper.anyOf(tasks));
652 assertSame(future1, oper.anyOf(() -> future1));
656 public void testAllOfArray() throws Exception {
657 final OperationOutcome outcome = params.makeOutcome();
659 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
660 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
661 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
663 @SuppressWarnings("unchecked")
664 CompletableFuture<OperationOutcome> result =
665 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
667 assertTrue(executor.runAll(MAX_REQUESTS));
668 assertFalse(result.isDone());
669 future1.complete(outcome);
671 // complete 3 before 2
672 assertTrue(executor.runAll(MAX_REQUESTS));
673 assertFalse(result.isDone());
674 future3.complete(outcome);
676 assertTrue(executor.runAll(MAX_REQUESTS));
677 assertFalse(result.isDone());
678 future2.complete(outcome);
680 // all of them are now done
681 assertTrue(executor.runAll(MAX_REQUESTS));
682 assertTrue(result.isDone());
683 assertSame(outcome, result.get());
687 public void testAllOfList() throws Exception {
688 final OperationOutcome outcome = params.makeOutcome();
690 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
691 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
692 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
694 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
695 tasks.add(() -> future1);
696 tasks.add(() -> future2);
697 tasks.add(() -> null);
698 tasks.add(() -> future3);
700 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
702 assertTrue(executor.runAll(MAX_REQUESTS));
703 assertFalse(result.isDone());
704 future1.complete(outcome);
706 // complete 3 before 2
707 assertTrue(executor.runAll(MAX_REQUESTS));
708 assertFalse(result.isDone());
709 future3.complete(outcome);
711 assertTrue(executor.runAll(MAX_REQUESTS));
712 assertFalse(result.isDone());
713 future2.complete(outcome);
715 // all of them are now done
716 assertTrue(executor.runAll(MAX_REQUESTS));
717 assertTrue(result.isDone());
718 assertSame(outcome, result.get());
722 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
725 @SuppressWarnings("unchecked")
726 public void testAllOfEdge() throws Exception {
727 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
729 // zero items: check both using a list and using an array
730 assertNull(oper.allOf(tasks));
731 assertNull(oper.allOf());
733 // one item: : check both using a list and using an array
734 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
735 tasks.add(() -> future1);
737 assertSame(future1, oper.allOf(tasks));
738 assertSame(future1, oper.allOf(() -> future1));
742 public void testAttachFutures() throws Exception {
743 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
745 // third task throws an exception during construction
746 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
747 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
748 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
749 tasks.add(() -> future1);
750 tasks.add(() -> future2);
752 throw new IllegalStateException(EXPECTED_EXCEPTION);
754 tasks.add(() -> future3);
756 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
758 // should have canceled the first two, but not the last
759 assertTrue(future1.isCancelled());
760 assertTrue(future2.isCancelled());
761 assertFalse(future3.isCancelled());
765 public void testCombineOutcomes() throws Exception {
767 verifyOutcomes(0, PolicyResult.SUCCESS);
768 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
770 // maximum is in different positions
771 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
772 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
773 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
775 // null outcome - takes precedence over a success
776 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
777 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
778 tasks.add(() -> CompletableFuture.completedFuture(null));
779 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
780 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
782 assertTrue(executor.runAll(MAX_REQUESTS));
783 assertTrue(result.isDone());
784 assertNull(result.get());
786 // one throws an exception during execution
787 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
790 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
791 tasks.add(() -> CompletableFuture.failedFuture(except));
792 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
793 result = oper.allOf(tasks);
795 assertTrue(executor.runAll(MAX_REQUESTS));
796 assertTrue(result.isCompletedExceptionally());
797 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
801 * Tests both flavors of sequence(), because one invokes the other.
804 public void testSequence() throws Exception {
805 final OperationOutcome outcome = params.makeOutcome();
807 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
808 tasks.add(() -> CompletableFuture.completedFuture(outcome));
809 tasks.add(() -> null);
810 tasks.add(() -> CompletableFuture.completedFuture(outcome));
811 tasks.add(() -> CompletableFuture.completedFuture(outcome));
813 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
814 assertTrue(executor.runAll(MAX_REQUESTS));
815 assertTrue(result.isDone());
816 assertSame(outcome, result.get());
818 // repeat using array form
819 @SuppressWarnings("unchecked")
820 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
821 result = oper.sequence(tasks.toArray(taskArray));
822 assertTrue(executor.runAll(MAX_REQUESTS));
823 assertTrue(result.isDone());
824 assertSame(outcome, result.get());
826 // second task fails, third should not run
827 OperationOutcome failure = params.makeOutcome();
828 failure.setResult(PolicyResult.FAILURE);
830 tasks.add(() -> CompletableFuture.completedFuture(outcome));
831 tasks.add(() -> CompletableFuture.completedFuture(failure));
832 tasks.add(() -> CompletableFuture.completedFuture(outcome));
834 result = oper.sequence(tasks);
835 assertTrue(executor.runAll(MAX_REQUESTS));
836 assertTrue(result.isDone());
837 assertSame(failure, result.get());
841 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
844 @SuppressWarnings("unchecked")
845 public void testSequenceEdge() throws Exception {
846 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
848 // zero items: check both using a list and using an array
849 assertNull(oper.sequence(tasks));
850 assertNull(oper.sequence());
852 // one item: : check both using a list and using an array
853 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
854 tasks.add(() -> future1);
856 assertSame(future1, oper.sequence(tasks));
857 assertSame(future1, oper.sequence(() -> future1));
860 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
861 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
863 OperationOutcome expectedOutcome = null;
865 for (int count = 0; count < results.length; ++count) {
866 OperationOutcome outcome = params.makeOutcome();
867 outcome.setResult(results[count]);
868 tasks.add(() -> CompletableFuture.completedFuture(outcome));
870 if (count == expected) {
871 expectedOutcome = outcome;
875 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
877 assertTrue(executor.runAll(MAX_REQUESTS));
878 assertTrue(result.isDone());
879 assertSame(expectedOutcome, result.get());
883 public void testDetmPriority() throws CoderException {
884 assertEquals(1, oper.detmPriority(null));
886 OperationOutcome outcome = params.makeOutcome();
888 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
889 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
890 PolicyResult.FAILURE_EXCEPTION, 6);
892 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
893 outcome.setResult(ent.getKey());
894 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
898 * Test null result. We can't actually set it to null, because the set() method
899 * won't allow it. Instead, we decode it from a structure.
901 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
902 assertEquals(1, oper.detmPriority(outcome));
906 * Tests callbackStarted() when the pipeline has already been stopped.
909 public void testCallbackStartedNotRunning() {
910 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
913 * arrange to stop the controller when the start-callback is invoked, but capture
916 params = params.toBuilder().startCallback(oper -> {
918 future.get().cancel(false);
921 // new params, thus need a new operation
924 future.set(oper.start());
925 assertTrue(executor.runAll(MAX_REQUESTS));
927 // should have only run once
928 assertEquals(1, numStart);
932 * Tests callbackCompleted() when the pipeline has already been stopped.
935 public void testCallbackCompletedNotRunning() {
936 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
938 // arrange to stop the controller when the start-callback is invoked
939 params = params.toBuilder().startCallback(oper -> {
940 future.get().cancel(false);
943 // new params, thus need a new operation
946 future.set(oper.start());
947 assertTrue(executor.runAll(MAX_REQUESTS));
949 // should not have been set
951 assertEquals(0, numEnd);
955 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
956 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
958 OperationOutcome outcome;
960 outcome = new OperationOutcome();
961 oper.setOutcome(outcome, timex);
962 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
963 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
965 outcome = new OperationOutcome();
966 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
967 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
968 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
972 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
973 OperationOutcome outcome;
975 outcome = new OperationOutcome();
976 oper.setOutcome(outcome, PolicyResult.SUCCESS);
977 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
978 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
980 for (PolicyResult result : FAILURE_RESULTS) {
981 outcome = new OperationOutcome();
982 oper.setOutcome(outcome, result);
983 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
984 assertEquals(result.toString(), result, outcome.getResult());
989 public void testIsTimeout() {
990 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
992 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
993 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
994 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
995 assertFalse(oper.isTimeout(new CompletionException(null)));
996 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
998 assertTrue(oper.isTimeout(timex));
999 assertTrue(oper.isTimeout(new CompletionException(timex)));
1003 public void testLogMessage() {
1004 final String infraStr = SINK_INFRA.toString();
1006 // log structured data
1007 appender.clearExtractions();
1008 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1009 List<String> output = appender.getExtracted();
1010 assertEquals(1, output.size());
1012 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1013 .contains("{\n \"text\": \"my-text\"\n}");
1015 // repeat with a response
1016 appender.clearExtractions();
1017 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1018 output = appender.getExtracted();
1019 assertEquals(1, output.size());
1021 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1022 .contains("{\n \"text\": \"my-text\"\n}");
1024 // log a plain string
1025 appender.clearExtractions();
1026 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1027 output = appender.getExtracted();
1028 assertEquals(1, output.size());
1029 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1031 // log a null request
1032 appender.clearExtractions();
1033 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1034 output = appender.getExtracted();
1035 assertEquals(1, output.size());
1037 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1039 // generate exception from coder
1040 setOperCoderException();
1042 appender.clearExtractions();
1043 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1044 output = appender.getExtracted();
1045 assertEquals(2, output.size());
1046 assertThat(output.get(0)).contains("cannot pretty-print request");
1047 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1049 // repeat with a response
1050 appender.clearExtractions();
1051 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1052 output = appender.getExtracted();
1053 assertEquals(2, output.size());
1054 assertThat(output.get(0)).contains("cannot pretty-print response");
1055 assertThat(output.get(1)).contains(MY_SOURCE);
1059 public void testGetRetry() {
1060 assertEquals(0, oper.getRetry(null));
1061 assertEquals(10, oper.getRetry(10));
1065 public void testGetRetryWait() {
1066 // need an operator that doesn't override the retry time
1067 OperationPartial oper2 = new OperationPartial(params, config) {};
1068 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1072 public void testGetTimeOutMs() {
1073 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1075 params = params.toBuilder().timeoutSec(null).build();
1077 // new params, thus need a new operation
1078 oper = new MyOper();
1080 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1083 private void starter(OperationOutcome oper) {
1085 tstart = oper.getStart();
1089 private void completer(OperationOutcome oper) {
1095 * Gets a function that does nothing.
1097 * @param <T> type of input parameter expected by the function
1098 * @return a function that does nothing
1100 private <T> Consumer<T> noop() {
1105 private OperationOutcome makeSuccess() {
1106 OperationOutcome outcome = params.makeOutcome();
1107 outcome.setResult(PolicyResult.SUCCESS);
1112 private OperationOutcome makeFailure() {
1113 OperationOutcome outcome = params.makeOutcome();
1114 outcome.setResult(PolicyResult.FAILURE);
1122 * @param testName test name
1123 * @param expectedCallbacks number of callbacks expected
1124 * @param expectedOperations number of operation invocations expected
1125 * @param expectedResult expected outcome
1127 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1128 PolicyResult expectedResult) {
1130 String expectedSubRequestId =
1131 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1133 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1139 * @param testName test name
1140 * @param expectedCallbacks number of callbacks expected
1141 * @param expectedOperations number of operation invocations expected
1142 * @param expectedResult expected outcome
1143 * @param expectedSubRequestId expected sub request ID
1144 * @param manipulator function to modify the future returned by
1145 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1146 * in the executor are run
1148 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1149 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1151 CompletableFuture<OperationOutcome> future = oper.start();
1153 manipulator.accept(future);
1155 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1157 assertEquals(testName, expectedCallbacks, numStart);
1158 assertEquals(testName, expectedCallbacks, numEnd);
1160 if (expectedCallbacks > 0) {
1161 assertNotNull(testName, opstart);
1162 assertNotNull(testName, opend);
1163 assertEquals(testName, expectedResult, opend.getResult());
1165 assertSame(testName, tstart, opstart.getStart());
1166 assertSame(testName, tstart, opend.getStart());
1169 assertTrue(future.isDone());
1170 assertSame(testName, opend, future.get());
1172 } catch (InterruptedException | ExecutionException e) {
1173 throw new IllegalStateException(e);
1176 if (expectedOperations > 0) {
1177 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1181 assertEquals(testName, expectedOperations, oper.getCount());
1185 * Creates a new {@link #oper} whose coder will throw an exception.
1187 private void setOperCoderException() {
1188 oper = new MyOper() {
1190 protected Coder makeCoder() {
1191 return new StandardCoder() {
1193 public String encode(Object object, boolean pretty) throws CoderException {
1194 throw new CoderException(EXPECTED_EXCEPTION);
1203 public static class MyData {
1204 private String text = TEXT;
1208 private class MyOper extends OperationPartial {
1210 private int count = 0;
1213 private boolean genException;
1216 private int maxFailures = 0;
1219 private CompletableFuture<OperationOutcome> guard;
1223 super(OperationPartialTest.this.params, config);
1227 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1230 throw new IllegalStateException(EXPECTED_EXCEPTION);
1233 operation.setSubRequestId(String.valueOf(attempt));
1235 if (count > maxFailures) {
1236 operation.setResult(PolicyResult.SUCCESS);
1238 operation.setResult(PolicyResult.FAILURE);
1245 protected CompletableFuture<OperationOutcome> startGuardAsync() {
1246 return (guard != null ? guard : super.startGuardAsync());
1250 protected long getRetryWaitMs() {
1252 * Sleep timers run in the background, but we want to control things via the
1253 * "executor", thus we avoid sleep timers altogether by simply returning 0.