2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.actorserviceprovider.impl;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertFalse;
28 import static org.junit.jupiter.api.Assertions.assertNotNull;
29 import static org.junit.jupiter.api.Assertions.assertNull;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
33 import ch.qos.logback.classic.Logger;
34 import java.time.Instant;
35 import java.util.ArrayDeque;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.Deque;
39 import java.util.LinkedList;
40 import java.util.List;
42 import java.util.Map.Entry;
43 import java.util.UUID;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionException;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ForkJoinPool;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Supplier;
56 import org.junit.jupiter.api.AfterAll;
57 import org.junit.jupiter.api.BeforeAll;
58 import org.junit.jupiter.api.BeforeEach;
59 import org.junit.jupiter.api.Test;
60 import org.junit.jupiter.api.extension.ExtendWith;
61 import org.junit.runner.RunWith;
62 import org.mockito.Mock;
63 import org.mockito.junit.MockitoJUnitRunner;
64 import org.mockito.junit.jupiter.MockitoExtension;
65 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
66 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
67 import org.onap.policy.common.utils.coder.Coder;
68 import org.onap.policy.common.utils.coder.CoderException;
69 import org.onap.policy.common.utils.coder.StandardCoder;
70 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
71 import org.onap.policy.common.utils.time.PseudoExecutor;
72 import org.onap.policy.controlloop.ControlLoopOperation;
73 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
74 import org.onap.policy.controlloop.actorserviceprovider.Operation;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
81 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
82 import org.slf4j.LoggerFactory;
84 @ExtendWith(MockitoExtension.class)
85 class OperationPartialTest {
86 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
87 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
88 private static final int MAX_REQUESTS = 100;
89 private static final int MAX_PARALLEL = 10;
90 private static final String EXPECTED_EXCEPTION = "expected exception";
91 private static final String ACTOR = "my-actor";
92 private static final String OPERATION = "my-operation";
93 private static final String MY_SINK = "my-sink";
94 private static final String MY_SOURCE = "my-source";
95 private static final String MY_TARGET_ENTITY = "my-entity";
96 private static final String TEXT = "my-text";
97 private static final int TIMEOUT = 1000;
98 private static final UUID REQ_ID = UUID.randomUUID();
100 private static final List<OperationResult> FAILURE_RESULTS = Arrays.stream(OperationResult.values())
101 .filter(result -> result != OperationResult.SUCCESS).toList();
104 * Used to attach an appender to the class' logger.
106 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
107 private static final ExtractAppender appender = new ExtractAppender();
109 private static final List<String> PROP_NAMES = List.of("hello", "world");
112 private ActorService service;
114 private Actor guardActor;
116 private Operator guardOperator;
118 private Operation guardOperation;
120 private PseudoExecutor executor;
121 private ControlLoopOperationParams params;
125 private int numStart;
128 private Instant tstart;
130 private OperationOutcome opstart;
131 private OperationOutcome opend;
133 private Deque<OperationOutcome> starts;
134 private Deque<OperationOutcome> ends;
136 private OperatorConfig config;
139 * Attaches the appender to the logger.
142 static void setUpBeforeClass() throws Exception {
144 * Attach appender to the logger.
146 appender.setContext(logger.getLoggerContext());
149 logger.addAppender(appender);
153 * Stops the appender.
156 static void tearDownAfterClass() {
161 * Initializes the fields, including {@link #oper}.
165 executor = new PseudoExecutor();
167 params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
168 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
169 .startCallback(this::starter).build();
171 config = new OperatorConfig(executor);
180 starts = new ArrayDeque<>(10);
181 ends = new ArrayDeque<>(10);
185 void testOperatorPartial_testGetActorName_testGetName() {
186 assertEquals(ACTOR, oper.getActorName());
187 assertEquals(OPERATION, oper.getName());
188 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
192 void testGetBlockingThread() throws Exception {
193 CompletableFuture<Void> future = new CompletableFuture<>();
195 // use the real executor
196 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
198 public Operation buildOperation(ControlLoopOperationParams params) {
203 oper2.getBlockingExecutor().execute(() -> future.complete(null));
205 assertNull(future.get(5, TimeUnit.SECONDS));
209 void testGetPropertyNames() {
210 assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
214 void testGetProperty_testSetProperty_testGetRequiredProperty() {
215 oper.setProperty("propertyA", "valueA");
216 oper.setProperty("propertyB", "valueB");
217 oper.setProperty("propertyC", 20);
218 oper.setProperty("propertyD", "valueD");
220 assertEquals("valueA", oper.getProperty("propertyA"));
221 assertEquals("valueB", oper.getProperty("propertyB"));
222 assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
224 assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
226 assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
227 .withMessage("missing some type");
232 verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
236 * Tests start() with multiple running requests.
239 void testStartMultiple() {
240 for (int count = 0; count < MAX_PARALLEL; ++count) {
244 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
246 assertNotNull(opstart);
247 assertNotNull(opend);
248 assertEquals(OperationResult.SUCCESS, opend.getResult());
250 assertEquals(MAX_PARALLEL, numStart);
251 assertEquals(MAX_PARALLEL, oper.getCount());
252 assertEquals(MAX_PARALLEL, numEnd);
256 void testStartOperationAsync() {
258 assertTrue(executor.runAll(MAX_REQUESTS));
260 assertEquals(1, oper.getCount());
264 void testIsSuccess() {
265 assertFalse(oper.isSuccess(null));
267 OperationOutcome outcome = new OperationOutcome();
269 outcome.setResult(OperationResult.SUCCESS);
270 assertTrue(oper.isSuccess(outcome));
272 for (OperationResult failure : FAILURE_RESULTS) {
273 outcome.setResult(failure);
274 assertFalse(oper.isSuccess(outcome), "testIsSuccess-" + failure);
279 void testIsActorFailed() {
280 assertFalse(oper.isActorFailed(null));
282 OperationOutcome outcome = params.makeOutcome();
285 outcome.setResult(OperationResult.SUCCESS);
286 assertFalse(oper.isActorFailed(outcome));
288 outcome.setResult(OperationResult.FAILURE_RETRIES);
289 assertFalse(oper.isActorFailed(outcome));
292 outcome.setResult(OperationResult.FAILURE);
295 outcome.setActor(MY_SINK);
296 assertFalse(oper.isActorFailed(outcome));
297 outcome.setActor(null);
298 assertFalse(oper.isActorFailed(outcome));
299 outcome.setActor(ACTOR);
301 // incorrect operation
302 outcome.setOperation(MY_SINK);
303 assertFalse(oper.isActorFailed(outcome));
304 outcome.setOperation(null);
305 assertFalse(oper.isActorFailed(outcome));
306 outcome.setOperation(OPERATION);
309 assertTrue(oper.isActorFailed(outcome));
313 void testDoOperation() {
315 * Use an operation that doesn't override doOperation().
317 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
320 assertTrue(executor.runAll(MAX_REQUESTS));
322 assertNotNull(opend);
323 assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
327 void testTimeout() throws Exception {
329 // use a real executor
330 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
332 // trigger timeout very quickly
333 oper = new MyOper() {
335 protected long getTimeoutMs(Integer timeoutSec) {
340 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
342 OperationOutcome outcome2 = params.makeOutcome();
343 outcome2.setResult(OperationResult.SUCCESS);
346 * Create an incomplete future that will timeout after the operation's
347 * timeout. If it fires before the other timer, then it will return a
350 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
351 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
352 params.getExecutor());
358 assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
362 * Tests retry functions, when the count is set to zero and retries are exhausted.
365 void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
366 params = params.toBuilder().retry(0).build();
368 // new params, thus need a new operation
371 oper.setMaxFailures(10);
373 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
377 * Tests retry functions, when the count is null and retries are exhausted.
380 void testSetRetryFlag_testRetryOnFailure_NullRetries() {
381 params = params.toBuilder().retry(null).build();
383 // new params, thus need a new operation
386 oper.setMaxFailures(10);
388 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
392 * Tests retry functions, when retries are exhausted.
395 void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
396 final int maxRetries = 3;
397 params = params.toBuilder().retry(maxRetries).build();
399 // new params, thus need a new operation
402 oper.setMaxFailures(10);
404 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
405 OperationResult.FAILURE_RETRIES);
409 * Tests retry functions, when a success follows some retries.
412 void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
413 params = params.toBuilder().retry(10).build();
415 // new params, thus need a new operation
418 final int maxFailures = 3;
419 oper.setMaxFailures(maxFailures);
421 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
422 OperationResult.SUCCESS);
426 * Tests retry functions, when the outcome is {@code null}.
429 void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
431 // arrange to return null from doOperation()
432 oper = new MyOper() {
434 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
437 super.doOperation(attempt, outcome);
442 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
446 void testSleep() throws Exception {
447 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
448 assertTrue(future.isDone());
449 assertNull(future.get());
452 future = oper.sleep(0, TimeUnit.SECONDS);
453 assertTrue(future.isDone());
454 assertNull(future.get());
457 * Start a second sleep we can use to check the first while it's running.
459 tstart = Instant.now();
460 future = oper.sleep(100, TimeUnit.MILLISECONDS);
462 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
464 // wait for second to complete and verify that the first has not completed
466 assertFalse(future.isDone());
468 // wait for second to complete
471 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
472 assertTrue(diff >= 99);
476 void testIsSameOperation() {
477 assertFalse(oper.isSameOperation(null));
479 OperationOutcome outcome = params.makeOutcome();
481 // wrong actor - should be false
482 outcome.setActor(null);
483 assertFalse(oper.isSameOperation(outcome));
484 outcome.setActor(MY_SINK);
485 assertFalse(oper.isSameOperation(outcome));
486 outcome.setActor(ACTOR);
488 // wrong operation - should be null
489 outcome.setOperation(null);
490 assertFalse(oper.isSameOperation(outcome));
491 outcome.setOperation(MY_SINK);
492 assertFalse(oper.isSameOperation(outcome));
493 outcome.setOperation(OPERATION);
495 assertTrue(oper.isSameOperation(outcome));
499 void testFromException() {
500 // arrange to generate an exception when operation runs
501 oper.setGenException(true);
503 verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
507 * Tests fromException() when there is no exception.
510 void testFromExceptionNoExcept() {
511 verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
515 * Tests both flavors of anyOf(), because one invokes the other.
518 void testAnyOf() throws Exception {
519 // first task completes, others do not
520 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
522 final OperationOutcome outcome = params.makeOutcome();
524 tasks.add(() -> CompletableFuture.completedFuture(outcome));
525 tasks.add(() -> new CompletableFuture<>());
526 tasks.add(() -> null);
527 tasks.add(() -> new CompletableFuture<>());
529 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
530 assertTrue(executor.runAll(MAX_REQUESTS));
531 assertTrue(result.isDone());
532 assertSame(outcome, result.get());
534 // repeat using array form
535 @SuppressWarnings("unchecked")
536 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
537 result = oper.anyOf(tasks.toArray(taskArray));
538 assertTrue(executor.runAll(MAX_REQUESTS));
539 assertTrue(result.isDone());
540 assertSame(outcome, result.get());
542 // second task completes, others do not
544 tasks.add(() -> new CompletableFuture<>());
545 tasks.add(() -> CompletableFuture.completedFuture(outcome));
546 tasks.add(() -> new CompletableFuture<>());
548 result = oper.anyOf(tasks);
549 assertTrue(executor.runAll(MAX_REQUESTS));
550 assertTrue(result.isDone());
551 assertSame(outcome, result.get());
553 // third task completes, others do not
555 tasks.add(() -> new CompletableFuture<>());
556 tasks.add(() -> new CompletableFuture<>());
557 tasks.add(() -> CompletableFuture.completedFuture(outcome));
559 result = oper.anyOf(tasks);
560 assertTrue(executor.runAll(MAX_REQUESTS));
561 assertTrue(result.isDone());
562 assertSame(outcome, result.get());
566 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
569 @SuppressWarnings("unchecked")
570 void testAnyOfEdge() throws Exception {
571 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
573 // zero items: check both using a list and using an array
574 assertNull(oper.anyOf(tasks));
575 assertNull(oper.anyOf());
577 // one item: : check both using a list and using an array
578 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
579 tasks.add(() -> future1);
581 assertSame(future1, oper.anyOf(tasks));
582 assertSame(future1, oper.anyOf(() -> future1));
586 void testAllOfArray() throws Exception {
587 final OperationOutcome outcome = params.makeOutcome();
589 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
590 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
591 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
593 @SuppressWarnings("unchecked")
594 CompletableFuture<OperationOutcome> result =
595 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
597 assertTrue(executor.runAll(MAX_REQUESTS));
598 assertFalse(result.isDone());
599 future1.complete(outcome);
601 // complete 3 before 2
602 assertTrue(executor.runAll(MAX_REQUESTS));
603 assertFalse(result.isDone());
604 future3.complete(outcome);
606 assertTrue(executor.runAll(MAX_REQUESTS));
607 assertFalse(result.isDone());
608 future2.complete(outcome);
610 // all of them are now done
611 assertTrue(executor.runAll(MAX_REQUESTS));
612 assertTrue(result.isDone());
613 assertSame(outcome, result.get());
617 void testAllOfList() throws Exception {
618 final OperationOutcome outcome = params.makeOutcome();
620 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
621 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
622 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
624 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
625 tasks.add(() -> future1);
626 tasks.add(() -> future2);
627 tasks.add(() -> null);
628 tasks.add(() -> future3);
630 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
632 assertTrue(executor.runAll(MAX_REQUESTS));
633 assertFalse(result.isDone());
634 future1.complete(outcome);
636 // complete 3 before 2
637 assertTrue(executor.runAll(MAX_REQUESTS));
638 assertFalse(result.isDone());
639 future3.complete(outcome);
641 assertTrue(executor.runAll(MAX_REQUESTS));
642 assertFalse(result.isDone());
643 future2.complete(outcome);
645 // all of them are now done
646 assertTrue(executor.runAll(MAX_REQUESTS));
647 assertTrue(result.isDone());
648 assertSame(outcome, result.get());
652 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
655 @SuppressWarnings("unchecked")
656 void testAllOfEdge() throws Exception {
657 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
659 // zero items: check both using a list and using an array
660 assertNull(oper.allOf(tasks));
661 assertNull(oper.allOf());
663 // one item: : check both using a list and using an array
664 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
665 tasks.add(() -> future1);
667 assertSame(future1, oper.allOf(tasks));
668 assertSame(future1, oper.allOf(() -> future1));
672 void testAttachFutures() throws Exception {
673 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
675 // third task throws an exception during construction
676 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
677 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
678 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
679 tasks.add(() -> future1);
680 tasks.add(() -> future2);
682 throw new IllegalStateException(EXPECTED_EXCEPTION);
684 tasks.add(() -> future3);
686 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
688 // should have canceled the first two, but not the last
689 assertTrue(future1.isCancelled());
690 assertTrue(future2.isCancelled());
691 assertFalse(future3.isCancelled());
695 void testCombineOutcomes() throws Exception {
697 verifyOutcomes(0, OperationResult.SUCCESS);
698 verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
700 // maximum is in different positions
701 verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
702 verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
703 verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
705 // null outcome - takes precedence over a success
706 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
707 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
708 tasks.add(() -> CompletableFuture.completedFuture(null));
709 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
710 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
712 assertTrue(executor.runAll(MAX_REQUESTS));
713 assertTrue(result.isDone());
714 assertNull(result.get());
716 // one throws an exception during execution
717 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
720 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
721 tasks.add(() -> CompletableFuture.failedFuture(except));
722 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
723 result = oper.allOf(tasks);
725 assertTrue(executor.runAll(MAX_REQUESTS));
726 assertTrue(result.isCompletedExceptionally());
727 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
731 * Tests both flavors of sequence(), because one invokes the other.
734 void testSequence() throws Exception {
735 final OperationOutcome outcome = params.makeOutcome();
737 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
738 tasks.add(() -> CompletableFuture.completedFuture(outcome));
739 tasks.add(() -> null);
740 tasks.add(() -> CompletableFuture.completedFuture(outcome));
741 tasks.add(() -> CompletableFuture.completedFuture(outcome));
743 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
744 assertTrue(executor.runAll(MAX_REQUESTS));
745 assertTrue(result.isDone());
746 assertSame(outcome, result.get());
748 // repeat using array form
749 @SuppressWarnings("unchecked")
750 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
751 result = oper.sequence(tasks.toArray(taskArray));
752 assertTrue(executor.runAll(MAX_REQUESTS));
753 assertTrue(result.isDone());
754 assertSame(outcome, result.get());
756 // second task fails, third should not run
757 OperationOutcome failure = params.makeOutcome();
758 failure.setResult(OperationResult.FAILURE);
760 tasks.add(() -> CompletableFuture.completedFuture(outcome));
761 tasks.add(() -> CompletableFuture.completedFuture(failure));
762 tasks.add(() -> CompletableFuture.completedFuture(outcome));
764 result = oper.sequence(tasks);
765 assertTrue(executor.runAll(MAX_REQUESTS));
766 assertTrue(result.isDone());
767 assertSame(failure, result.get());
771 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
774 @SuppressWarnings("unchecked")
775 void testSequenceEdge() throws Exception {
776 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
778 // zero items: check both using a list and using an array
779 assertNull(oper.sequence(tasks));
780 assertNull(oper.sequence());
782 // one item: : check both using a list and using an array
783 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
784 tasks.add(() -> future1);
786 assertSame(future1, oper.sequence(tasks));
787 assertSame(future1, oper.sequence(() -> future1));
790 private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
791 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
793 OperationOutcome expectedOutcome = null;
795 for (int count = 0; count < results.length; ++count) {
796 OperationOutcome outcome = params.makeOutcome();
797 outcome.setResult(results[count]);
798 tasks.add(() -> CompletableFuture.completedFuture(outcome));
800 if (count == expected) {
801 expectedOutcome = outcome;
805 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
807 assertTrue(executor.runAll(MAX_REQUESTS));
808 assertTrue(result.isDone());
809 assertSame(expectedOutcome, result.get());
813 void testDetmPriority() throws CoderException {
814 assertEquals(1, oper.detmPriority(null));
816 OperationOutcome outcome = params.makeOutcome();
818 Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
819 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
820 OperationResult.FAILURE_EXCEPTION, 6);
822 for (Entry<OperationResult, Integer> ent : map.entrySet()) {
823 outcome.setResult(ent.getKey());
824 assertEquals(ent.getValue().intValue(), oper.detmPriority(outcome), ent.getKey().toString());
828 * Test null result. We can't actually set it to null, because the set() method
829 * won't allow it. Instead, we decode it from a structure.
831 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
832 assertEquals(1, oper.detmPriority(outcome));
836 * Tests callbackStarted() when the pipeline has already been stopped.
839 void testCallbackStartedNotRunning() {
840 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
843 * arrange to stop the controller when the start-callback is invoked, but capture
846 params = params.toBuilder().startCallback(oper -> {
848 future.get().cancel(false);
851 // new params, thus need a new operation
854 future.set(oper.start());
855 assertTrue(executor.runAll(MAX_REQUESTS));
857 // should have only run once
858 assertEquals(1, numStart);
862 * Tests callbackCompleted() when the pipeline has already been stopped.
865 void testCallbackCompletedNotRunning() {
866 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
868 // arrange to stop the controller when the start-callback is invoked
869 params = params.toBuilder().startCallback(oper -> {
870 future.get().cancel(false);
873 // new params, thus need a new operation
876 future.set(oper.start());
877 assertTrue(executor.runAll(MAX_REQUESTS));
879 // should not have been set
881 assertEquals(0, numEnd);
885 void testSetOutcomeControlLoopOperationOutcomeThrowable() {
886 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
888 OperationOutcome outcome;
890 outcome = new OperationOutcome();
891 oper.setOutcome(outcome, timex);
892 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
893 assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
895 outcome = new OperationOutcome();
896 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
897 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
898 assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
902 void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
903 OperationOutcome outcome;
905 outcome = new OperationOutcome();
906 oper.setOutcome(outcome, OperationResult.SUCCESS);
907 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
908 assertEquals(OperationResult.SUCCESS, outcome.getResult());
910 oper.setOutcome(outcome, OperationResult.SUCCESS);
911 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
912 assertEquals(OperationResult.SUCCESS, outcome.getResult());
914 for (OperationResult result : FAILURE_RESULTS) {
915 outcome = new OperationOutcome();
916 oper.setOutcome(outcome, result);
917 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage(), result.toString());
918 assertEquals(result, outcome.getResult(), result.toString());
923 void testMakeOutcome() {
924 oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
925 assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
929 void testIsTimeout() {
930 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
932 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
933 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
934 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
935 assertFalse(oper.isTimeout(new CompletionException(null)));
936 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
938 assertTrue(oper.isTimeout(timex));
939 assertTrue(oper.isTimeout(new CompletionException(timex)));
943 void testLogMessage() {
944 final String infraStr = SINK_INFRA.toString();
946 // log structured data
947 appender.clearExtractions();
948 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
949 List<String> output = appender.getExtracted();
950 assertEquals(1, output.size());
952 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
953 .contains("{\n \"text\": \"my-text\"\n}");
955 // repeat with a response
956 appender.clearExtractions();
957 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
958 output = appender.getExtracted();
959 assertEquals(1, output.size());
961 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
962 .contains("{\n \"text\": \"my-text\"\n}");
964 // log a plain string
965 appender.clearExtractions();
966 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
967 output = appender.getExtracted();
968 assertEquals(1, output.size());
969 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
971 // log a null request
972 appender.clearExtractions();
973 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
974 output = appender.getExtracted();
975 assertEquals(1, output.size());
977 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
979 // generate exception from coder
980 setOperCoderException();
982 appender.clearExtractions();
983 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
984 output = appender.getExtracted();
985 assertEquals(2, output.size());
986 assertThat(output.get(0)).contains("cannot pretty-print request");
987 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
989 // repeat with a response
990 appender.clearExtractions();
991 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
992 output = appender.getExtracted();
993 assertEquals(2, output.size());
994 assertThat(output.get(0)).contains("cannot pretty-print response");
995 assertThat(output.get(1)).contains(MY_SOURCE);
999 void testGetRetry() {
1000 assertEquals(0, oper.getRetry(null));
1001 assertEquals(10, oper.getRetry(10));
1005 void testGetRetryWait() {
1006 // need an operator that doesn't override the retry time
1007 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1008 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1012 void testGetTimeOutMs() {
1013 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1015 params = params.toBuilder().timeoutSec(null).build();
1017 // new params, thus need a new operation
1018 oper = new MyOper();
1020 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1023 private void starter(OperationOutcome oper) {
1025 tstart = oper.getStart();
1030 private void completer(OperationOutcome oper) {
1037 * Gets a function that does nothing.
1039 * @param <T> type of input parameter expected by the function
1040 * @return a function that does nothing
1042 private <T> Consumer<T> noop() {
1050 * @param testName test name
1051 * @param expectedCallbacks number of callbacks expected
1052 * @param expectedOperations number of operation invocations expected
1053 * @param expectedResult expected outcome
1055 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1056 OperationResult expectedResult) {
1058 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1064 * @param testName test name
1065 * @param expectedCallbacks number of callbacks expected
1066 * @param expectedOperations number of operation invocations expected
1067 * @param expectedResult expected outcome
1068 * @param manipulator function to modify the future returned by
1069 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1070 * in the executor are run
1072 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1073 OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1081 CompletableFuture<OperationOutcome> future = oper.start();
1083 manipulator.accept(future);
1085 assertTrue(executor.runAll(MAX_REQUESTS), testName);
1087 assertEquals(expectedCallbacks, numStart, testName);
1088 assertEquals(expectedCallbacks, numEnd, testName);
1090 if (expectedCallbacks > 0) {
1091 assertNotNull(opstart, testName);
1092 assertNotNull(opend, testName);
1093 assertEquals(expectedResult, opend.getResult(), testName);
1095 assertSame(tstart, opstart.getStart(), testName);
1096 assertSame(tstart, opend.getStart(), testName);
1099 assertTrue(future.isDone());
1100 assertEquals(opend, future.get(), testName);
1102 // "start" is never final
1103 for (OperationOutcome outcome : starts) {
1104 assertFalse(outcome.isFinalOutcome(), testName);
1107 // only the last "complete" is final
1108 assertTrue(ends.removeLast().isFinalOutcome(), testName);
1110 for (OperationOutcome outcome : ends) {
1111 assertFalse(outcome.isFinalOutcome());
1114 } catch (InterruptedException | ExecutionException e) {
1115 throw new IllegalStateException(e);
1118 if (expectedOperations > 0) {
1119 assertNotNull(testName, oper.getSubRequestId());
1120 assertEquals(oper.getSubRequestId(), opstart.getSubRequestId(), testName + " op start");
1121 assertEquals(oper.getSubRequestId(), opend.getSubRequestId(), testName + " op end");
1125 assertEquals(expectedOperations, oper.getCount(), testName);
1129 * Creates a new {@link #oper} whose coder will throw an exception.
1131 private void setOperCoderException() {
1132 oper = new MyOper() {
1134 protected Coder getCoder() {
1135 return new StandardCoder() {
1137 public String encode(Object object, boolean pretty) throws CoderException {
1138 throw new CoderException(EXPECTED_EXCEPTION);
1147 static class MyData {
1148 private String text = TEXT;
1152 private class MyOper extends OperationPartial {
1154 private int count = 0;
1157 private boolean genException;
1159 private int maxFailures = 0;
1161 private CompletableFuture<OperationOutcome> preProc;
1165 super(OperationPartialTest.this.params, config, PROP_NAMES);
1169 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1172 throw new IllegalStateException(EXPECTED_EXCEPTION);
1175 operation.setSubRequestId(String.valueOf(attempt));
1177 if (count > maxFailures) {
1178 operation.setResult(OperationResult.SUCCESS);
1180 operation.setResult(OperationResult.FAILURE);
1187 protected long getRetryWaitMs() {
1189 * Sleep timers run in the background, but we want to control things via the
1190 * "executor", thus we avoid sleep timers altogether by simply returning 0.