2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.controlloop.actorserviceprovider.impl;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
33 import ch.qos.logback.classic.Logger;
34 import java.time.Instant;
35 import java.util.ArrayDeque;
36 import java.util.Arrays;
37 import java.util.Collections;
38 import java.util.Deque;
39 import java.util.LinkedList;
40 import java.util.List;
42 import java.util.Map.Entry;
43 import java.util.UUID;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionException;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.ForkJoinPool;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.concurrent.atomic.AtomicReference;
52 import java.util.function.Consumer;
53 import java.util.function.Supplier;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.junit.runner.RunWith;
61 import org.mockito.Mock;
62 import org.mockito.junit.MockitoJUnitRunner;
63 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
64 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
65 import org.onap.policy.common.utils.coder.Coder;
66 import org.onap.policy.common.utils.coder.CoderException;
67 import org.onap.policy.common.utils.coder.StandardCoder;
68 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
69 import org.onap.policy.common.utils.time.PseudoExecutor;
70 import org.onap.policy.controlloop.ControlLoopOperation;
71 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
72 import org.onap.policy.controlloop.actorserviceprovider.Operation;
73 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
76 import org.onap.policy.controlloop.actorserviceprovider.Operator;
77 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
79 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
80 import org.slf4j.LoggerFactory;
82 @RunWith(MockitoJUnitRunner.class)
83 public class OperationPartialTest {
84 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
86 private static final int MAX_REQUESTS = 100;
87 private static final int MAX_PARALLEL = 10;
88 private static final String EXPECTED_EXCEPTION = "expected exception";
89 private static final String ACTOR = "my-actor";
90 private static final String OPERATION = "my-operation";
91 private static final String MY_SINK = "my-sink";
92 private static final String MY_SOURCE = "my-source";
93 private static final String MY_TARGET_ENTITY = "my-entity";
94 private static final String TEXT = "my-text";
95 private static final int TIMEOUT = 1000;
96 private static final UUID REQ_ID = UUID.randomUUID();
98 private static final List<OperationResult> FAILURE_RESULTS = Arrays.stream(OperationResult.values())
99 .filter(result -> result != OperationResult.SUCCESS).toList();
102 * Used to attach an appender to the class' logger.
104 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105 private static final ExtractAppender appender = new ExtractAppender();
107 private static final List<String> PROP_NAMES = List.of("hello", "world");
110 private ActorService service;
112 private Actor guardActor;
114 private Operator guardOperator;
116 private Operation guardOperation;
118 private PseudoExecutor executor;
119 private ControlLoopOperationParams params;
123 private int numStart;
126 private Instant tstart;
128 private OperationOutcome opstart;
129 private OperationOutcome opend;
131 private Deque<OperationOutcome> starts;
132 private Deque<OperationOutcome> ends;
134 private OperatorConfig config;
137 * Attaches the appender to the logger.
140 public static void setUpBeforeClass() throws Exception {
142 * Attach appender to the logger.
144 appender.setContext(logger.getLoggerContext());
147 logger.addAppender(appender);
151 * Stops the appender.
154 public static void tearDownAfterClass() {
159 * Initializes the fields, including {@link #oper}.
162 public void setUp() {
163 executor = new PseudoExecutor();
165 params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
166 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
167 .startCallback(this::starter).build();
169 config = new OperatorConfig(executor);
178 starts = new ArrayDeque<>(10);
179 ends = new ArrayDeque<>(10);
183 public void testOperatorPartial_testGetActorName_testGetName() {
184 assertEquals(ACTOR, oper.getActorName());
185 assertEquals(OPERATION, oper.getName());
186 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
190 public void testGetBlockingThread() throws Exception {
191 CompletableFuture<Void> future = new CompletableFuture<>();
193 // use the real executor
194 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
196 public Operation buildOperation(ControlLoopOperationParams params) {
201 oper2.getBlockingExecutor().execute(() -> future.complete(null));
203 assertNull(future.get(5, TimeUnit.SECONDS));
207 public void testGetPropertyNames() {
208 assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
212 public void testGetProperty_testSetProperty_testGetRequiredProperty() {
213 oper.setProperty("propertyA", "valueA");
214 oper.setProperty("propertyB", "valueB");
215 oper.setProperty("propertyC", 20);
216 oper.setProperty("propertyD", "valueD");
218 assertEquals("valueA", oper.getProperty("propertyA"));
219 assertEquals("valueB", oper.getProperty("propertyB"));
220 assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
222 assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
224 assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
225 .withMessage("missing some type");
229 public void testStart() {
230 verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
234 * Tests start() with multiple running requests.
237 public void testStartMultiple() {
238 for (int count = 0; count < MAX_PARALLEL; ++count) {
242 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
244 assertNotNull(opstart);
245 assertNotNull(opend);
246 assertEquals(OperationResult.SUCCESS, opend.getResult());
248 assertEquals(MAX_PARALLEL, numStart);
249 assertEquals(MAX_PARALLEL, oper.getCount());
250 assertEquals(MAX_PARALLEL, numEnd);
254 public void testStartOperationAsync() {
256 assertTrue(executor.runAll(MAX_REQUESTS));
258 assertEquals(1, oper.getCount());
262 public void testIsSuccess() {
263 assertFalse(oper.isSuccess(null));
265 OperationOutcome outcome = new OperationOutcome();
267 outcome.setResult(OperationResult.SUCCESS);
268 assertTrue(oper.isSuccess(outcome));
270 for (OperationResult failure : FAILURE_RESULTS) {
271 outcome.setResult(failure);
272 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
277 public void testIsActorFailed() {
278 assertFalse(oper.isActorFailed(null));
280 OperationOutcome outcome = params.makeOutcome();
283 outcome.setResult(OperationResult.SUCCESS);
284 assertFalse(oper.isActorFailed(outcome));
286 outcome.setResult(OperationResult.FAILURE_RETRIES);
287 assertFalse(oper.isActorFailed(outcome));
290 outcome.setResult(OperationResult.FAILURE);
293 outcome.setActor(MY_SINK);
294 assertFalse(oper.isActorFailed(outcome));
295 outcome.setActor(null);
296 assertFalse(oper.isActorFailed(outcome));
297 outcome.setActor(ACTOR);
299 // incorrect operation
300 outcome.setOperation(MY_SINK);
301 assertFalse(oper.isActorFailed(outcome));
302 outcome.setOperation(null);
303 assertFalse(oper.isActorFailed(outcome));
304 outcome.setOperation(OPERATION);
307 assertTrue(oper.isActorFailed(outcome));
311 public void testDoOperation() {
313 * Use an operation that doesn't override doOperation().
315 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
318 assertTrue(executor.runAll(MAX_REQUESTS));
320 assertNotNull(opend);
321 assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
325 public void testTimeout() throws Exception {
327 // use a real executor
328 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
330 // trigger timeout very quickly
331 oper = new MyOper() {
333 protected long getTimeoutMs(Integer timeoutSec) {
338 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
340 OperationOutcome outcome2 = params.makeOutcome();
341 outcome2.setResult(OperationResult.SUCCESS);
344 * Create an incomplete future that will timeout after the operation's
345 * timeout. If it fires before the other timer, then it will return a
348 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
349 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
350 params.getExecutor());
356 assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
360 * Tests retry functions, when the count is set to zero and retries are exhausted.
363 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
364 params = params.toBuilder().retry(0).build();
366 // new params, thus need a new operation
369 oper.setMaxFailures(10);
371 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
375 * Tests retry functions, when the count is null and retries are exhausted.
378 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
379 params = params.toBuilder().retry(null).build();
381 // new params, thus need a new operation
384 oper.setMaxFailures(10);
386 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
390 * Tests retry functions, when retries are exhausted.
393 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
394 final int maxRetries = 3;
395 params = params.toBuilder().retry(maxRetries).build();
397 // new params, thus need a new operation
400 oper.setMaxFailures(10);
402 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
403 OperationResult.FAILURE_RETRIES);
407 * Tests retry functions, when a success follows some retries.
410 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
411 params = params.toBuilder().retry(10).build();
413 // new params, thus need a new operation
416 final int maxFailures = 3;
417 oper.setMaxFailures(maxFailures);
419 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
420 OperationResult.SUCCESS);
424 * Tests retry functions, when the outcome is {@code null}.
427 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
429 // arrange to return null from doOperation()
430 oper = new MyOper() {
432 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
435 super.doOperation(attempt, outcome);
440 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
444 public void testSleep() throws Exception {
445 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
446 assertTrue(future.isDone());
447 assertNull(future.get());
450 future = oper.sleep(0, TimeUnit.SECONDS);
451 assertTrue(future.isDone());
452 assertNull(future.get());
455 * Start a second sleep we can use to check the first while it's running.
457 tstart = Instant.now();
458 future = oper.sleep(100, TimeUnit.MILLISECONDS);
460 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
462 // wait for second to complete and verify that the first has not completed
464 assertFalse(future.isDone());
466 // wait for second to complete
469 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
470 assertTrue(diff >= 99);
474 public void testIsSameOperation() {
475 assertFalse(oper.isSameOperation(null));
477 OperationOutcome outcome = params.makeOutcome();
479 // wrong actor - should be false
480 outcome.setActor(null);
481 assertFalse(oper.isSameOperation(outcome));
482 outcome.setActor(MY_SINK);
483 assertFalse(oper.isSameOperation(outcome));
484 outcome.setActor(ACTOR);
486 // wrong operation - should be null
487 outcome.setOperation(null);
488 assertFalse(oper.isSameOperation(outcome));
489 outcome.setOperation(MY_SINK);
490 assertFalse(oper.isSameOperation(outcome));
491 outcome.setOperation(OPERATION);
493 assertTrue(oper.isSameOperation(outcome));
497 public void testFromException() {
498 // arrange to generate an exception when operation runs
499 oper.setGenException(true);
501 verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
505 * Tests fromException() when there is no exception.
508 public void testFromExceptionNoExcept() {
509 verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
513 * Tests both flavors of anyOf(), because one invokes the other.
516 public void testAnyOf() throws Exception {
517 // first task completes, others do not
518 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
520 final OperationOutcome outcome = params.makeOutcome();
522 tasks.add(() -> CompletableFuture.completedFuture(outcome));
523 tasks.add(() -> new CompletableFuture<>());
524 tasks.add(() -> null);
525 tasks.add(() -> new CompletableFuture<>());
527 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
528 assertTrue(executor.runAll(MAX_REQUESTS));
529 assertTrue(result.isDone());
530 assertSame(outcome, result.get());
532 // repeat using array form
533 @SuppressWarnings("unchecked")
534 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
535 result = oper.anyOf(tasks.toArray(taskArray));
536 assertTrue(executor.runAll(MAX_REQUESTS));
537 assertTrue(result.isDone());
538 assertSame(outcome, result.get());
540 // second task completes, others do not
542 tasks.add(() -> new CompletableFuture<>());
543 tasks.add(() -> CompletableFuture.completedFuture(outcome));
544 tasks.add(() -> new CompletableFuture<>());
546 result = oper.anyOf(tasks);
547 assertTrue(executor.runAll(MAX_REQUESTS));
548 assertTrue(result.isDone());
549 assertSame(outcome, result.get());
551 // third task completes, others do not
553 tasks.add(() -> new CompletableFuture<>());
554 tasks.add(() -> new CompletableFuture<>());
555 tasks.add(() -> CompletableFuture.completedFuture(outcome));
557 result = oper.anyOf(tasks);
558 assertTrue(executor.runAll(MAX_REQUESTS));
559 assertTrue(result.isDone());
560 assertSame(outcome, result.get());
564 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
567 @SuppressWarnings("unchecked")
568 public void testAnyOfEdge() throws Exception {
569 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
571 // zero items: check both using a list and using an array
572 assertNull(oper.anyOf(tasks));
573 assertNull(oper.anyOf());
575 // one item: : check both using a list and using an array
576 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
577 tasks.add(() -> future1);
579 assertSame(future1, oper.anyOf(tasks));
580 assertSame(future1, oper.anyOf(() -> future1));
584 public void testAllOfArray() throws Exception {
585 final OperationOutcome outcome = params.makeOutcome();
587 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
588 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
589 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
591 @SuppressWarnings("unchecked")
592 CompletableFuture<OperationOutcome> result =
593 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
595 assertTrue(executor.runAll(MAX_REQUESTS));
596 assertFalse(result.isDone());
597 future1.complete(outcome);
599 // complete 3 before 2
600 assertTrue(executor.runAll(MAX_REQUESTS));
601 assertFalse(result.isDone());
602 future3.complete(outcome);
604 assertTrue(executor.runAll(MAX_REQUESTS));
605 assertFalse(result.isDone());
606 future2.complete(outcome);
608 // all of them are now done
609 assertTrue(executor.runAll(MAX_REQUESTS));
610 assertTrue(result.isDone());
611 assertSame(outcome, result.get());
615 public void testAllOfList() throws Exception {
616 final OperationOutcome outcome = params.makeOutcome();
618 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
619 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
620 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
622 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
623 tasks.add(() -> future1);
624 tasks.add(() -> future2);
625 tasks.add(() -> null);
626 tasks.add(() -> future3);
628 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
630 assertTrue(executor.runAll(MAX_REQUESTS));
631 assertFalse(result.isDone());
632 future1.complete(outcome);
634 // complete 3 before 2
635 assertTrue(executor.runAll(MAX_REQUESTS));
636 assertFalse(result.isDone());
637 future3.complete(outcome);
639 assertTrue(executor.runAll(MAX_REQUESTS));
640 assertFalse(result.isDone());
641 future2.complete(outcome);
643 // all of them are now done
644 assertTrue(executor.runAll(MAX_REQUESTS));
645 assertTrue(result.isDone());
646 assertSame(outcome, result.get());
650 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
653 @SuppressWarnings("unchecked")
654 public void testAllOfEdge() throws Exception {
655 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
657 // zero items: check both using a list and using an array
658 assertNull(oper.allOf(tasks));
659 assertNull(oper.allOf());
661 // one item: : check both using a list and using an array
662 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
663 tasks.add(() -> future1);
665 assertSame(future1, oper.allOf(tasks));
666 assertSame(future1, oper.allOf(() -> future1));
670 public void testAttachFutures() throws Exception {
671 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
673 // third task throws an exception during construction
674 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
675 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
676 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
677 tasks.add(() -> future1);
678 tasks.add(() -> future2);
680 throw new IllegalStateException(EXPECTED_EXCEPTION);
682 tasks.add(() -> future3);
684 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
686 // should have canceled the first two, but not the last
687 assertTrue(future1.isCancelled());
688 assertTrue(future2.isCancelled());
689 assertFalse(future3.isCancelled());
693 public void testCombineOutcomes() throws Exception {
695 verifyOutcomes(0, OperationResult.SUCCESS);
696 verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
698 // maximum is in different positions
699 verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
700 verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
701 verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
703 // null outcome - takes precedence over a success
704 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
705 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
706 tasks.add(() -> CompletableFuture.completedFuture(null));
707 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
708 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
710 assertTrue(executor.runAll(MAX_REQUESTS));
711 assertTrue(result.isDone());
712 assertNull(result.get());
714 // one throws an exception during execution
715 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
718 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
719 tasks.add(() -> CompletableFuture.failedFuture(except));
720 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
721 result = oper.allOf(tasks);
723 assertTrue(executor.runAll(MAX_REQUESTS));
724 assertTrue(result.isCompletedExceptionally());
725 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
729 * Tests both flavors of sequence(), because one invokes the other.
732 public void testSequence() throws Exception {
733 final OperationOutcome outcome = params.makeOutcome();
735 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
736 tasks.add(() -> CompletableFuture.completedFuture(outcome));
737 tasks.add(() -> null);
738 tasks.add(() -> CompletableFuture.completedFuture(outcome));
739 tasks.add(() -> CompletableFuture.completedFuture(outcome));
741 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
742 assertTrue(executor.runAll(MAX_REQUESTS));
743 assertTrue(result.isDone());
744 assertSame(outcome, result.get());
746 // repeat using array form
747 @SuppressWarnings("unchecked")
748 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
749 result = oper.sequence(tasks.toArray(taskArray));
750 assertTrue(executor.runAll(MAX_REQUESTS));
751 assertTrue(result.isDone());
752 assertSame(outcome, result.get());
754 // second task fails, third should not run
755 OperationOutcome failure = params.makeOutcome();
756 failure.setResult(OperationResult.FAILURE);
758 tasks.add(() -> CompletableFuture.completedFuture(outcome));
759 tasks.add(() -> CompletableFuture.completedFuture(failure));
760 tasks.add(() -> CompletableFuture.completedFuture(outcome));
762 result = oper.sequence(tasks);
763 assertTrue(executor.runAll(MAX_REQUESTS));
764 assertTrue(result.isDone());
765 assertSame(failure, result.get());
769 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
772 @SuppressWarnings("unchecked")
773 public void testSequenceEdge() throws Exception {
774 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
776 // zero items: check both using a list and using an array
777 assertNull(oper.sequence(tasks));
778 assertNull(oper.sequence());
780 // one item: : check both using a list and using an array
781 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
782 tasks.add(() -> future1);
784 assertSame(future1, oper.sequence(tasks));
785 assertSame(future1, oper.sequence(() -> future1));
788 private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
789 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
791 OperationOutcome expectedOutcome = null;
793 for (int count = 0; count < results.length; ++count) {
794 OperationOutcome outcome = params.makeOutcome();
795 outcome.setResult(results[count]);
796 tasks.add(() -> CompletableFuture.completedFuture(outcome));
798 if (count == expected) {
799 expectedOutcome = outcome;
803 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
805 assertTrue(executor.runAll(MAX_REQUESTS));
806 assertTrue(result.isDone());
807 assertSame(expectedOutcome, result.get());
811 public void testDetmPriority() throws CoderException {
812 assertEquals(1, oper.detmPriority(null));
814 OperationOutcome outcome = params.makeOutcome();
816 Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
817 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
818 OperationResult.FAILURE_EXCEPTION, 6);
820 for (Entry<OperationResult, Integer> ent : map.entrySet()) {
821 outcome.setResult(ent.getKey());
822 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
826 * Test null result. We can't actually set it to null, because the set() method
827 * won't allow it. Instead, we decode it from a structure.
829 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
830 assertEquals(1, oper.detmPriority(outcome));
834 * Tests callbackStarted() when the pipeline has already been stopped.
837 public void testCallbackStartedNotRunning() {
838 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
841 * arrange to stop the controller when the start-callback is invoked, but capture
844 params = params.toBuilder().startCallback(oper -> {
846 future.get().cancel(false);
849 // new params, thus need a new operation
852 future.set(oper.start());
853 assertTrue(executor.runAll(MAX_REQUESTS));
855 // should have only run once
856 assertEquals(1, numStart);
860 * Tests callbackCompleted() when the pipeline has already been stopped.
863 public void testCallbackCompletedNotRunning() {
864 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
866 // arrange to stop the controller when the start-callback is invoked
867 params = params.toBuilder().startCallback(oper -> {
868 future.get().cancel(false);
871 // new params, thus need a new operation
874 future.set(oper.start());
875 assertTrue(executor.runAll(MAX_REQUESTS));
877 // should not have been set
879 assertEquals(0, numEnd);
883 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
884 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
886 OperationOutcome outcome;
888 outcome = new OperationOutcome();
889 oper.setOutcome(outcome, timex);
890 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
891 assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
893 outcome = new OperationOutcome();
894 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
895 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
896 assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
900 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
901 OperationOutcome outcome;
903 outcome = new OperationOutcome();
904 oper.setOutcome(outcome, OperationResult.SUCCESS);
905 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
906 assertEquals(OperationResult.SUCCESS, outcome.getResult());
908 oper.setOutcome(outcome, OperationResult.SUCCESS);
909 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
910 assertEquals(OperationResult.SUCCESS, outcome.getResult());
912 for (OperationResult result : FAILURE_RESULTS) {
913 outcome = new OperationOutcome();
914 oper.setOutcome(outcome, result);
915 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
916 assertEquals(result.toString(), result, outcome.getResult());
921 public void testMakeOutcome() {
922 oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, MY_TARGET_ENTITY);
923 assertEquals(MY_TARGET_ENTITY, oper.makeOutcome().getTarget());
927 public void testIsTimeout() {
928 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
930 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
931 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
932 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
933 assertFalse(oper.isTimeout(new CompletionException(null)));
934 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
936 assertTrue(oper.isTimeout(timex));
937 assertTrue(oper.isTimeout(new CompletionException(timex)));
941 public void testLogMessage() {
942 final String infraStr = SINK_INFRA.toString();
944 // log structured data
945 appender.clearExtractions();
946 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
947 List<String> output = appender.getExtracted();
948 assertEquals(1, output.size());
950 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
951 .contains("{\n \"text\": \"my-text\"\n}");
953 // repeat with a response
954 appender.clearExtractions();
955 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
956 output = appender.getExtracted();
957 assertEquals(1, output.size());
959 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
960 .contains("{\n \"text\": \"my-text\"\n}");
962 // log a plain string
963 appender.clearExtractions();
964 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
965 output = appender.getExtracted();
966 assertEquals(1, output.size());
967 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
969 // log a null request
970 appender.clearExtractions();
971 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
972 output = appender.getExtracted();
973 assertEquals(1, output.size());
975 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
977 // generate exception from coder
978 setOperCoderException();
980 appender.clearExtractions();
981 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
982 output = appender.getExtracted();
983 assertEquals(2, output.size());
984 assertThat(output.get(0)).contains("cannot pretty-print request");
985 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
987 // repeat with a response
988 appender.clearExtractions();
989 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
990 output = appender.getExtracted();
991 assertEquals(2, output.size());
992 assertThat(output.get(0)).contains("cannot pretty-print response");
993 assertThat(output.get(1)).contains(MY_SOURCE);
997 public void testGetRetry() {
998 assertEquals(0, oper.getRetry(null));
999 assertEquals(10, oper.getRetry(10));
1003 public void testGetRetryWait() {
1004 // need an operator that doesn't override the retry time
1005 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1006 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1010 public void testGetTimeOutMs() {
1011 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1013 params = params.toBuilder().timeoutSec(null).build();
1015 // new params, thus need a new operation
1016 oper = new MyOper();
1018 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1021 private void starter(OperationOutcome oper) {
1023 tstart = oper.getStart();
1028 private void completer(OperationOutcome oper) {
1035 * Gets a function that does nothing.
1037 * @param <T> type of input parameter expected by the function
1038 * @return a function that does nothing
1040 private <T> Consumer<T> noop() {
1048 * @param testName test name
1049 * @param expectedCallbacks number of callbacks expected
1050 * @param expectedOperations number of operation invocations expected
1051 * @param expectedResult expected outcome
1053 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1054 OperationResult expectedResult) {
1056 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1062 * @param testName test name
1063 * @param expectedCallbacks number of callbacks expected
1064 * @param expectedOperations number of operation invocations expected
1065 * @param expectedResult expected outcome
1066 * @param manipulator function to modify the future returned by
1067 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1068 * in the executor are run
1070 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1071 OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1079 CompletableFuture<OperationOutcome> future = oper.start();
1081 manipulator.accept(future);
1083 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1085 assertEquals(testName, expectedCallbacks, numStart);
1086 assertEquals(testName, expectedCallbacks, numEnd);
1088 if (expectedCallbacks > 0) {
1089 assertNotNull(testName, opstart);
1090 assertNotNull(testName, opend);
1091 assertEquals(testName, expectedResult, opend.getResult());
1093 assertSame(testName, tstart, opstart.getStart());
1094 assertSame(testName, tstart, opend.getStart());
1097 assertTrue(future.isDone());
1098 assertEquals(testName, opend, future.get());
1100 // "start" is never final
1101 for (OperationOutcome outcome : starts) {
1102 assertFalse(testName, outcome.isFinalOutcome());
1105 // only the last "complete" is final
1106 assertTrue(testName, ends.removeLast().isFinalOutcome());
1108 for (OperationOutcome outcome : ends) {
1109 assertFalse(outcome.isFinalOutcome());
1112 } catch (InterruptedException | ExecutionException e) {
1113 throw new IllegalStateException(e);
1116 if (expectedOperations > 0) {
1117 assertNotNull(testName, oper.getSubRequestId());
1118 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1119 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1123 assertEquals(testName, expectedOperations, oper.getCount());
1127 * Creates a new {@link #oper} whose coder will throw an exception.
1129 private void setOperCoderException() {
1130 oper = new MyOper() {
1132 protected Coder getCoder() {
1133 return new StandardCoder() {
1135 public String encode(Object object, boolean pretty) throws CoderException {
1136 throw new CoderException(EXPECTED_EXCEPTION);
1145 public static class MyData {
1146 private String text = TEXT;
1150 private class MyOper extends OperationPartial {
1152 private int count = 0;
1155 private boolean genException;
1157 private int maxFailures = 0;
1159 private CompletableFuture<OperationOutcome> preProc;
1163 super(OperationPartialTest.this.params, config, PROP_NAMES);
1167 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1170 throw new IllegalStateException(EXPECTED_EXCEPTION);
1173 operation.setSubRequestId(String.valueOf(attempt));
1175 if (count > maxFailures) {
1176 operation.setResult(OperationResult.SUCCESS);
1178 operation.setResult(OperationResult.FAILURE);
1185 protected long getRetryWaitMs() {
1187 * Sleep timers run in the background, but we want to control things via the
1188 * "executor", thus we avoid sleep timers altogether by simply returning 0.