2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.when;
34 import ch.qos.logback.classic.Logger;
35 import java.time.Instant;
36 import java.util.ArrayDeque;
37 import java.util.Arrays;
38 import java.util.Collections;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.Mock;
63 import org.mockito.MockitoAnnotations;
64 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
65 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
66 import org.onap.policy.common.utils.coder.Coder;
67 import org.onap.policy.common.utils.coder.CoderException;
68 import org.onap.policy.common.utils.coder.StandardCoder;
69 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
70 import org.onap.policy.common.utils.time.PseudoExecutor;
71 import org.onap.policy.controlloop.ControlLoopOperation;
72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
75 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
80 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
81 import org.slf4j.LoggerFactory;
83 public class OperationPartialTest {
84 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86 private static final int MAX_REQUESTS = 100;
87 private static final int MAX_PARALLEL = 10;
88 private static final String EXPECTED_EXCEPTION = "expected exception";
89 private static final String ACTOR = "my-actor";
90 private static final String OPERATION = "my-operation";
91 private static final String MY_SINK = "my-sink";
92 private static final String MY_SOURCE = "my-source";
93 private static final String MY_TARGET_ENTITY = "my-entity";
94 private static final String TEXT = "my-text";
95 private static final int TIMEOUT = 1000;
96 private static final UUID REQ_ID = UUID.randomUUID();
98 private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
99 .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
102 * Used to attach an appender to the class' logger.
104 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105 private static final ExtractAppender appender = new ExtractAppender();
107 private static final List<String> PROP_NAMES = List.of("hello", "world");
110 private ActorService service;
112 private Actor guardActor;
114 private Operator guardOperator;
116 private Operation guardOperation;
118 private PseudoExecutor executor;
119 private ControlLoopOperationParams params;
123 private int numStart;
126 private Instant tstart;
128 private OperationOutcome opstart;
129 private OperationOutcome opend;
131 private Deque<OperationOutcome> starts;
132 private Deque<OperationOutcome> ends;
134 private OperatorConfig config;
137 * Attaches the appender to the logger.
140 public static void setUpBeforeClass() throws Exception {
142 * Attach appender to the logger.
144 appender.setContext(logger.getLoggerContext());
147 logger.addAppender(appender);
151 * Stops the appender.
154 public static void tearDownAfterClass() {
159 * Initializes the fields, including {@link #oper}.
162 public void setUp() {
163 MockitoAnnotations.initMocks(this);
164 executor = new PseudoExecutor();
166 params = ControlLoopOperationParams.builder().completeCallback(this::completer).requestId(REQ_ID)
167 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
168 .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
170 when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
171 when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
172 when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
173 when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
175 config = new OperatorConfig(executor);
184 starts = new ArrayDeque<>(10);
185 ends = new ArrayDeque<>(10);
189 public void testOperatorPartial_testGetActorName_testGetName() {
190 assertEquals(ACTOR, oper.getActorName());
191 assertEquals(OPERATION, oper.getName());
192 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
196 public void testGetBlockingThread() throws Exception {
197 CompletableFuture<Void> future = new CompletableFuture<>();
199 // use the real executor
200 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
202 public Operation buildOperation(ControlLoopOperationParams params) {
207 oper2.getBlockingExecutor().execute(() -> future.complete(null));
209 assertNull(future.get(5, TimeUnit.SECONDS));
213 public void testGetPropertyNames() {
214 assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
218 public void testGetProperty_testSetProperty_testGetRequiredProperty() {
219 oper.setProperty("propertyA", "valueA");
220 oper.setProperty("propertyB", "valueB");
221 oper.setProperty("propertyC", 20);
222 oper.setProperty("propertyD", "valueD");
224 assertEquals("valueA", oper.getProperty("propertyA"));
225 assertEquals("valueB", oper.getProperty("propertyB"));
226 assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
228 assertEquals("valueD", oper.getRequiredProperty("propertyD", "typeD"));
230 assertThatIllegalStateException().isThrownBy(() -> oper.getRequiredProperty("propertyUnknown", "some type"))
231 .withMessage("missing some type");
235 public void testStart() {
236 verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
240 * Tests start() with multiple running requests.
243 public void testStartMultiple() {
244 for (int count = 0; count < MAX_PARALLEL; ++count) {
248 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
250 assertNotNull(opstart);
251 assertNotNull(opend);
252 assertEquals(OperationResult.SUCCESS, opend.getResult());
254 assertEquals(MAX_PARALLEL, numStart);
255 assertEquals(MAX_PARALLEL, oper.getCount());
256 assertEquals(MAX_PARALLEL, numEnd);
260 public void testStartOperationAsync() {
262 assertTrue(executor.runAll(MAX_REQUESTS));
264 assertEquals(1, oper.getCount());
268 public void testIsSuccess() {
269 assertFalse(oper.isSuccess(null));
271 OperationOutcome outcome = new OperationOutcome();
273 outcome.setResult(OperationResult.SUCCESS);
274 assertTrue(oper.isSuccess(outcome));
276 for (OperationResult failure : FAILURE_RESULTS) {
277 outcome.setResult(failure);
278 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
283 public void testIsActorFailed() {
284 assertFalse(oper.isActorFailed(null));
286 OperationOutcome outcome = params.makeOutcome(null);
289 outcome.setResult(OperationResult.SUCCESS);
290 assertFalse(oper.isActorFailed(outcome));
292 outcome.setResult(OperationResult.FAILURE_RETRIES);
293 assertFalse(oper.isActorFailed(outcome));
296 outcome.setResult(OperationResult.FAILURE);
299 outcome.setActor(MY_SINK);
300 assertFalse(oper.isActorFailed(outcome));
301 outcome.setActor(null);
302 assertFalse(oper.isActorFailed(outcome));
303 outcome.setActor(ACTOR);
305 // incorrect operation
306 outcome.setOperation(MY_SINK);
307 assertFalse(oper.isActorFailed(outcome));
308 outcome.setOperation(null);
309 assertFalse(oper.isActorFailed(outcome));
310 outcome.setOperation(OPERATION);
313 assertTrue(oper.isActorFailed(outcome));
317 public void testDoOperation() {
319 * Use an operation that doesn't override doOperation().
321 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
324 assertTrue(executor.runAll(MAX_REQUESTS));
326 assertNotNull(opend);
327 assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
331 public void testTimeout() throws Exception {
333 // use a real executor
334 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
336 // trigger timeout very quickly
337 oper = new MyOper() {
339 protected long getTimeoutMs(Integer timeoutSec) {
344 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
346 OperationOutcome outcome2 = params.makeOutcome(null);
347 outcome2.setResult(OperationResult.SUCCESS);
350 * Create an incomplete future that will timeout after the operation's
351 * timeout. If it fires before the other timer, then it will return a
354 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
355 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
356 params.getExecutor());
362 assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
366 * Tests retry functions, when the count is set to zero and retries are exhausted.
369 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
370 params = params.toBuilder().retry(0).build();
372 // new params, thus need a new operation
375 oper.setMaxFailures(10);
377 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
381 * Tests retry functions, when the count is null and retries are exhausted.
384 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
385 params = params.toBuilder().retry(null).build();
387 // new params, thus need a new operation
390 oper.setMaxFailures(10);
392 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
396 * Tests retry functions, when retries are exhausted.
399 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
400 final int maxRetries = 3;
401 params = params.toBuilder().retry(maxRetries).build();
403 // new params, thus need a new operation
406 oper.setMaxFailures(10);
408 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
409 OperationResult.FAILURE_RETRIES);
413 * Tests retry functions, when a success follows some retries.
416 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
417 params = params.toBuilder().retry(10).build();
419 // new params, thus need a new operation
422 final int maxFailures = 3;
423 oper.setMaxFailures(maxFailures);
425 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
426 OperationResult.SUCCESS);
430 * Tests retry functions, when the outcome is {@code null}.
433 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
435 // arrange to return null from doOperation()
436 oper = new MyOper() {
438 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
441 super.doOperation(attempt, outcome);
446 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
450 public void testSleep() throws Exception {
451 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
452 assertTrue(future.isDone());
453 assertNull(future.get());
456 future = oper.sleep(0, TimeUnit.SECONDS);
457 assertTrue(future.isDone());
458 assertNull(future.get());
461 * Start a second sleep we can use to check the first while it's running.
463 tstart = Instant.now();
464 future = oper.sleep(100, TimeUnit.MILLISECONDS);
466 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
468 // wait for second to complete and verify that the first has not completed
470 assertFalse(future.isDone());
472 // wait for second to complete
475 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
476 assertTrue(diff >= 99);
480 public void testIsSameOperation() {
481 assertFalse(oper.isSameOperation(null));
483 OperationOutcome outcome = params.makeOutcome(null);
485 // wrong actor - should be false
486 outcome.setActor(null);
487 assertFalse(oper.isSameOperation(outcome));
488 outcome.setActor(MY_SINK);
489 assertFalse(oper.isSameOperation(outcome));
490 outcome.setActor(ACTOR);
492 // wrong operation - should be null
493 outcome.setOperation(null);
494 assertFalse(oper.isSameOperation(outcome));
495 outcome.setOperation(MY_SINK);
496 assertFalse(oper.isSameOperation(outcome));
497 outcome.setOperation(OPERATION);
499 assertTrue(oper.isSameOperation(outcome));
503 public void testFromException() {
504 // arrange to generate an exception when operation runs
505 oper.setGenException(true);
507 verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
511 * Tests fromException() when there is no exception.
514 public void testFromExceptionNoExcept() {
515 verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
519 * Tests both flavors of anyOf(), because one invokes the other.
522 public void testAnyOf() throws Exception {
523 // first task completes, others do not
524 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
526 final OperationOutcome outcome = params.makeOutcome(null);
528 tasks.add(() -> CompletableFuture.completedFuture(outcome));
529 tasks.add(() -> new CompletableFuture<>());
530 tasks.add(() -> null);
531 tasks.add(() -> new CompletableFuture<>());
533 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
534 assertTrue(executor.runAll(MAX_REQUESTS));
535 assertTrue(result.isDone());
536 assertSame(outcome, result.get());
538 // repeat using array form
539 @SuppressWarnings("unchecked")
540 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
541 result = oper.anyOf(tasks.toArray(taskArray));
542 assertTrue(executor.runAll(MAX_REQUESTS));
543 assertTrue(result.isDone());
544 assertSame(outcome, result.get());
546 // second task completes, others do not
548 tasks.add(() -> new CompletableFuture<>());
549 tasks.add(() -> CompletableFuture.completedFuture(outcome));
550 tasks.add(() -> new CompletableFuture<>());
552 result = oper.anyOf(tasks);
553 assertTrue(executor.runAll(MAX_REQUESTS));
554 assertTrue(result.isDone());
555 assertSame(outcome, result.get());
557 // third task completes, others do not
559 tasks.add(() -> new CompletableFuture<>());
560 tasks.add(() -> new CompletableFuture<>());
561 tasks.add(() -> CompletableFuture.completedFuture(outcome));
563 result = oper.anyOf(tasks);
564 assertTrue(executor.runAll(MAX_REQUESTS));
565 assertTrue(result.isDone());
566 assertSame(outcome, result.get());
570 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
573 @SuppressWarnings("unchecked")
574 public void testAnyOfEdge() throws Exception {
575 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
577 // zero items: check both using a list and using an array
578 assertNull(oper.anyOf(tasks));
579 assertNull(oper.anyOf());
581 // one item: : check both using a list and using an array
582 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
583 tasks.add(() -> future1);
585 assertSame(future1, oper.anyOf(tasks));
586 assertSame(future1, oper.anyOf(() -> future1));
590 public void testAllOfArray() throws Exception {
591 final OperationOutcome outcome = params.makeOutcome(null);
593 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
594 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
595 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
597 @SuppressWarnings("unchecked")
598 CompletableFuture<OperationOutcome> result =
599 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
601 assertTrue(executor.runAll(MAX_REQUESTS));
602 assertFalse(result.isDone());
603 future1.complete(outcome);
605 // complete 3 before 2
606 assertTrue(executor.runAll(MAX_REQUESTS));
607 assertFalse(result.isDone());
608 future3.complete(outcome);
610 assertTrue(executor.runAll(MAX_REQUESTS));
611 assertFalse(result.isDone());
612 future2.complete(outcome);
614 // all of them are now done
615 assertTrue(executor.runAll(MAX_REQUESTS));
616 assertTrue(result.isDone());
617 assertSame(outcome, result.get());
621 public void testAllOfList() throws Exception {
622 final OperationOutcome outcome = params.makeOutcome(null);
624 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
625 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
626 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
628 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
629 tasks.add(() -> future1);
630 tasks.add(() -> future2);
631 tasks.add(() -> null);
632 tasks.add(() -> future3);
634 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
636 assertTrue(executor.runAll(MAX_REQUESTS));
637 assertFalse(result.isDone());
638 future1.complete(outcome);
640 // complete 3 before 2
641 assertTrue(executor.runAll(MAX_REQUESTS));
642 assertFalse(result.isDone());
643 future3.complete(outcome);
645 assertTrue(executor.runAll(MAX_REQUESTS));
646 assertFalse(result.isDone());
647 future2.complete(outcome);
649 // all of them are now done
650 assertTrue(executor.runAll(MAX_REQUESTS));
651 assertTrue(result.isDone());
652 assertSame(outcome, result.get());
656 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
659 @SuppressWarnings("unchecked")
660 public void testAllOfEdge() throws Exception {
661 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
663 // zero items: check both using a list and using an array
664 assertNull(oper.allOf(tasks));
665 assertNull(oper.allOf());
667 // one item: : check both using a list and using an array
668 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
669 tasks.add(() -> future1);
671 assertSame(future1, oper.allOf(tasks));
672 assertSame(future1, oper.allOf(() -> future1));
676 public void testAttachFutures() throws Exception {
677 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
679 // third task throws an exception during construction
680 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
681 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
682 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
683 tasks.add(() -> future1);
684 tasks.add(() -> future2);
686 throw new IllegalStateException(EXPECTED_EXCEPTION);
688 tasks.add(() -> future3);
690 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
692 // should have canceled the first two, but not the last
693 assertTrue(future1.isCancelled());
694 assertTrue(future2.isCancelled());
695 assertFalse(future3.isCancelled());
699 public void testCombineOutcomes() throws Exception {
701 verifyOutcomes(0, OperationResult.SUCCESS);
702 verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
704 // maximum is in different positions
705 verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
706 verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
707 verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
709 // null outcome - takes precedence over a success
710 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
711 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
712 tasks.add(() -> CompletableFuture.completedFuture(null));
713 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
714 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
716 assertTrue(executor.runAll(MAX_REQUESTS));
717 assertTrue(result.isDone());
718 assertNull(result.get());
720 // one throws an exception during execution
721 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
724 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
725 tasks.add(() -> CompletableFuture.failedFuture(except));
726 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
727 result = oper.allOf(tasks);
729 assertTrue(executor.runAll(MAX_REQUESTS));
730 assertTrue(result.isCompletedExceptionally());
731 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
735 * Tests both flavors of sequence(), because one invokes the other.
738 public void testSequence() throws Exception {
739 final OperationOutcome outcome = params.makeOutcome(null);
741 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
742 tasks.add(() -> CompletableFuture.completedFuture(outcome));
743 tasks.add(() -> null);
744 tasks.add(() -> CompletableFuture.completedFuture(outcome));
745 tasks.add(() -> CompletableFuture.completedFuture(outcome));
747 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
748 assertTrue(executor.runAll(MAX_REQUESTS));
749 assertTrue(result.isDone());
750 assertSame(outcome, result.get());
752 // repeat using array form
753 @SuppressWarnings("unchecked")
754 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
755 result = oper.sequence(tasks.toArray(taskArray));
756 assertTrue(executor.runAll(MAX_REQUESTS));
757 assertTrue(result.isDone());
758 assertSame(outcome, result.get());
760 // second task fails, third should not run
761 OperationOutcome failure = params.makeOutcome(null);
762 failure.setResult(OperationResult.FAILURE);
764 tasks.add(() -> CompletableFuture.completedFuture(outcome));
765 tasks.add(() -> CompletableFuture.completedFuture(failure));
766 tasks.add(() -> CompletableFuture.completedFuture(outcome));
768 result = oper.sequence(tasks);
769 assertTrue(executor.runAll(MAX_REQUESTS));
770 assertTrue(result.isDone());
771 assertSame(failure, result.get());
775 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
778 @SuppressWarnings("unchecked")
779 public void testSequenceEdge() throws Exception {
780 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
782 // zero items: check both using a list and using an array
783 assertNull(oper.sequence(tasks));
784 assertNull(oper.sequence());
786 // one item: : check both using a list and using an array
787 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
788 tasks.add(() -> future1);
790 assertSame(future1, oper.sequence(tasks));
791 assertSame(future1, oper.sequence(() -> future1));
794 private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
795 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
797 OperationOutcome expectedOutcome = null;
799 for (int count = 0; count < results.length; ++count) {
800 OperationOutcome outcome = params.makeOutcome(null);
801 outcome.setResult(results[count]);
802 tasks.add(() -> CompletableFuture.completedFuture(outcome));
804 if (count == expected) {
805 expectedOutcome = outcome;
809 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
811 assertTrue(executor.runAll(MAX_REQUESTS));
812 assertTrue(result.isDone());
813 assertSame(expectedOutcome, result.get());
817 public void testDetmPriority() throws CoderException {
818 assertEquals(1, oper.detmPriority(null));
820 OperationOutcome outcome = params.makeOutcome(null);
822 Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
823 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
824 OperationResult.FAILURE_EXCEPTION, 6);
826 for (Entry<OperationResult, Integer> ent : map.entrySet()) {
827 outcome.setResult(ent.getKey());
828 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
832 * Test null result. We can't actually set it to null, because the set() method
833 * won't allow it. Instead, we decode it from a structure.
835 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
836 assertEquals(1, oper.detmPriority(outcome));
840 * Tests callbackStarted() when the pipeline has already been stopped.
843 public void testCallbackStartedNotRunning() {
844 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
847 * arrange to stop the controller when the start-callback is invoked, but capture
850 params = params.toBuilder().startCallback(oper -> {
852 future.get().cancel(false);
855 // new params, thus need a new operation
858 future.set(oper.start());
859 assertTrue(executor.runAll(MAX_REQUESTS));
861 // should have only run once
862 assertEquals(1, numStart);
866 * Tests callbackCompleted() when the pipeline has already been stopped.
869 public void testCallbackCompletedNotRunning() {
870 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
872 // arrange to stop the controller when the start-callback is invoked
873 params = params.toBuilder().startCallback(oper -> {
874 future.get().cancel(false);
877 // new params, thus need a new operation
880 future.set(oper.start());
881 assertTrue(executor.runAll(MAX_REQUESTS));
883 // should not have been set
885 assertEquals(0, numEnd);
889 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
890 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
892 OperationOutcome outcome;
894 outcome = new OperationOutcome();
895 oper.setOutcome(outcome, timex);
896 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
897 assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
899 outcome = new OperationOutcome();
900 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
901 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
902 assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
906 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
907 OperationOutcome outcome;
909 outcome = new OperationOutcome();
910 oper.setOutcome(outcome, OperationResult.SUCCESS);
911 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
912 assertEquals(OperationResult.SUCCESS, outcome.getResult());
914 oper.setOutcome(outcome, OperationResult.SUCCESS);
915 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
916 assertEquals(OperationResult.SUCCESS, outcome.getResult());
918 for (OperationResult result : FAILURE_RESULTS) {
919 outcome = new OperationOutcome();
920 oper.setOutcome(outcome, result);
921 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
922 assertEquals(result.toString(), result, outcome.getResult());
927 public void testIsTimeout() {
928 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
930 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
931 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
932 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
933 assertFalse(oper.isTimeout(new CompletionException(null)));
934 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
936 assertTrue(oper.isTimeout(timex));
937 assertTrue(oper.isTimeout(new CompletionException(timex)));
941 public void testLogMessage() {
942 final String infraStr = SINK_INFRA.toString();
944 // log structured data
945 appender.clearExtractions();
946 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
947 List<String> output = appender.getExtracted();
948 assertEquals(1, output.size());
950 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
951 .contains("{\n \"text\": \"my-text\"\n}");
953 // repeat with a response
954 appender.clearExtractions();
955 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
956 output = appender.getExtracted();
957 assertEquals(1, output.size());
959 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
960 .contains("{\n \"text\": \"my-text\"\n}");
962 // log a plain string
963 appender.clearExtractions();
964 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
965 output = appender.getExtracted();
966 assertEquals(1, output.size());
967 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
969 // log a null request
970 appender.clearExtractions();
971 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
972 output = appender.getExtracted();
973 assertEquals(1, output.size());
975 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
977 // generate exception from coder
978 setOperCoderException();
980 appender.clearExtractions();
981 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
982 output = appender.getExtracted();
983 assertEquals(2, output.size());
984 assertThat(output.get(0)).contains("cannot pretty-print request");
985 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
987 // repeat with a response
988 appender.clearExtractions();
989 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
990 output = appender.getExtracted();
991 assertEquals(2, output.size());
992 assertThat(output.get(0)).contains("cannot pretty-print response");
993 assertThat(output.get(1)).contains(MY_SOURCE);
997 public void testGetRetry() {
998 assertEquals(0, oper.getRetry(null));
999 assertEquals(10, oper.getRetry(10));
1003 public void testGetRetryWait() {
1004 // need an operator that doesn't override the retry time
1005 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1006 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1010 public void testGetTargetEntity() {
1011 // get it from the params
1012 assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
1014 // now get it from the properties
1015 oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
1016 assertEquals("entityX", oper.getTargetEntity());
1020 public void testGetTimeOutMs() {
1021 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1023 params = params.toBuilder().timeoutSec(null).build();
1025 // new params, thus need a new operation
1026 oper = new MyOper();
1028 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1031 private void starter(OperationOutcome oper) {
1033 tstart = oper.getStart();
1038 private void completer(OperationOutcome oper) {
1045 * Gets a function that does nothing.
1047 * @param <T> type of input parameter expected by the function
1048 * @return a function that does nothing
1050 private <T> Consumer<T> noop() {
1055 private OperationOutcome makeSuccess() {
1056 OperationOutcome outcome = params.makeOutcome(null);
1057 outcome.setResult(OperationResult.SUCCESS);
1065 * @param testName test name
1066 * @param expectedCallbacks number of callbacks expected
1067 * @param expectedOperations number of operation invocations expected
1068 * @param expectedResult expected outcome
1070 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1071 OperationResult expectedResult) {
1073 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1079 * @param testName test name
1080 * @param expectedCallbacks number of callbacks expected
1081 * @param expectedOperations number of operation invocations expected
1082 * @param expectedResult expected outcome
1083 * @param manipulator function to modify the future returned by
1084 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1085 * in the executor are run
1087 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1088 OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1096 CompletableFuture<OperationOutcome> future = oper.start();
1098 manipulator.accept(future);
1100 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1102 assertEquals(testName, expectedCallbacks, numStart);
1103 assertEquals(testName, expectedCallbacks, numEnd);
1105 if (expectedCallbacks > 0) {
1106 assertNotNull(testName, opstart);
1107 assertNotNull(testName, opend);
1108 assertEquals(testName, expectedResult, opend.getResult());
1110 assertSame(testName, tstart, opstart.getStart());
1111 assertSame(testName, tstart, opend.getStart());
1114 assertTrue(future.isDone());
1115 assertEquals(testName, opend, future.get());
1117 // "start" is never final
1118 for (OperationOutcome outcome : starts) {
1119 assertFalse(testName, outcome.isFinalOutcome());
1122 // only the last "complete" is final
1123 assertTrue(testName, ends.removeLast().isFinalOutcome());
1125 for (OperationOutcome outcome : ends) {
1126 assertFalse(outcome.isFinalOutcome());
1129 } catch (InterruptedException | ExecutionException e) {
1130 throw new IllegalStateException(e);
1133 if (expectedOperations > 0) {
1134 assertNotNull(testName, oper.getSubRequestId());
1135 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1136 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1140 assertEquals(testName, expectedOperations, oper.getCount());
1144 * Creates a new {@link #oper} whose coder will throw an exception.
1146 private void setOperCoderException() {
1147 oper = new MyOper() {
1149 protected Coder getCoder() {
1150 return new StandardCoder() {
1152 public String encode(Object object, boolean pretty) throws CoderException {
1153 throw new CoderException(EXPECTED_EXCEPTION);
1162 public static class MyData {
1163 private String text = TEXT;
1167 private class MyOper extends OperationPartial {
1169 private int count = 0;
1172 private boolean genException;
1174 private int maxFailures = 0;
1176 private CompletableFuture<OperationOutcome> preProc;
1180 super(OperationPartialTest.this.params, config, PROP_NAMES);
1184 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1187 throw new IllegalStateException(EXPECTED_EXCEPTION);
1190 operation.setSubRequestId(String.valueOf(attempt));
1192 if (count > maxFailures) {
1193 operation.setResult(OperationResult.SUCCESS);
1195 operation.setResult(OperationResult.FAILURE);
1202 protected long getRetryWaitMs() {
1204 * Sleep timers run in the background, but we want to control things via the
1205 * "executor", thus we avoid sleep timers altogether by simply returning 0.