2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.Deque;
41 import java.util.LinkedList;
42 import java.util.List;
44 import java.util.Map.Entry;
45 import java.util.UUID;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.CompletionException;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicReference;
54 import java.util.function.Consumer;
55 import java.util.function.Supplier;
56 import java.util.stream.Collectors;
59 import org.junit.AfterClass;
60 import org.junit.Before;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.MockitoAnnotations;
66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
68 import org.onap.policy.common.utils.coder.Coder;
69 import org.onap.policy.common.utils.coder.CoderException;
70 import org.onap.policy.common.utils.coder.StandardCoder;
71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
72 import org.onap.policy.common.utils.time.PseudoExecutor;
73 import org.onap.policy.controlloop.ControlLoopOperation;
74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
78 import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
79 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
80 import org.onap.policy.controlloop.actorserviceprovider.Operator;
81 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
82 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
83 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
84 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
85 import org.slf4j.LoggerFactory;
87 public class OperationPartialTest {
88 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
89 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
90 private static final int MAX_REQUESTS = 100;
91 private static final int MAX_PARALLEL = 10;
92 private static final String EXPECTED_EXCEPTION = "expected exception";
93 private static final String ACTOR = "my-actor";
94 private static final String OPERATION = "my-operation";
95 private static final String MY_SINK = "my-sink";
96 private static final String MY_SOURCE = "my-source";
97 private static final String MY_TARGET_ENTITY = "my-entity";
98 private static final String TEXT = "my-text";
99 private static final int TIMEOUT = 1000;
100 private static final UUID REQ_ID = UUID.randomUUID();
102 private static final List<OperationResult> FAILURE_RESULTS = Arrays.asList(OperationResult.values()).stream()
103 .filter(result -> result != OperationResult.SUCCESS).collect(Collectors.toList());
106 * Used to attach an appender to the class' logger.
108 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
109 private static final ExtractAppender appender = new ExtractAppender();
111 private static final List<String> PROP_NAMES = List.of("hello", "world");
114 private ActorService service;
116 private Actor guardActor;
118 private Operator guardOperator;
120 private Operation guardOperation;
122 private VirtualControlLoopEvent event;
123 private ControlLoopEventContext context;
124 private PseudoExecutor executor;
125 private ControlLoopOperationParams params;
129 private int numStart;
132 private Instant tstart;
134 private OperationOutcome opstart;
135 private OperationOutcome opend;
137 private Deque<OperationOutcome> starts;
138 private Deque<OperationOutcome> ends;
140 private OperatorConfig config;
143 * Attaches the appender to the logger.
146 public static void setUpBeforeClass() throws Exception {
148 * Attach appender to the logger.
150 appender.setContext(logger.getLoggerContext());
153 logger.addAppender(appender);
157 * Stops the appender.
160 public static void tearDownAfterClass() {
165 * Initializes the fields, including {@link #oper}.
168 public void setUp() {
169 MockitoAnnotations.initMocks(this);
171 event = new VirtualControlLoopEvent();
172 event.setRequestId(REQ_ID);
174 context = new ControlLoopEventContext(event);
175 executor = new PseudoExecutor();
177 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
178 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
179 .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
181 when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
182 when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
183 when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
184 when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
186 config = new OperatorConfig(executor);
195 starts = new ArrayDeque<>(10);
196 ends = new ArrayDeque<>(10);
200 public void testOperatorPartial_testGetActorName_testGetName() {
201 assertEquals(ACTOR, oper.getActorName());
202 assertEquals(OPERATION, oper.getName());
203 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
207 public void testGetBlockingThread() throws Exception {
208 CompletableFuture<Void> future = new CompletableFuture<>();
210 // use the real executor
211 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
213 public Operation buildOperation(ControlLoopOperationParams params) {
218 oper2.getBlockingExecutor().execute(() -> future.complete(null));
220 assertNull(future.get(5, TimeUnit.SECONDS));
224 public void testGetPropertyNames() {
225 assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
229 public void testGetProperty_testSetProperty() {
230 oper.setProperty("propertyA", "valueA");
231 oper.setProperty("propertyB", "valueB");
232 oper.setProperty("propertyC", 20);
234 assertEquals("valueA", oper.getProperty("propertyA"));
235 assertEquals("valueB", oper.getProperty("propertyB"));
236 assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
240 public void testStart() {
241 verifyRun("testStart", 1, 1, OperationResult.SUCCESS);
245 * Tests start() with multiple running requests.
248 public void testStartMultiple() {
249 for (int count = 0; count < MAX_PARALLEL; ++count) {
253 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
255 assertNotNull(opstart);
256 assertNotNull(opend);
257 assertEquals(OperationResult.SUCCESS, opend.getResult());
259 assertEquals(MAX_PARALLEL, numStart);
260 assertEquals(MAX_PARALLEL, oper.getCount());
261 assertEquals(MAX_PARALLEL, numEnd);
265 * Tests startPreprocessor() when the preprocessor returns a failure.
268 public void testStartPreprocessorFailure() {
269 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
271 verifyRun("testStartPreprocessorFailure", 1, 0, OperationResult.FAILURE_GUARD);
275 * Tests startPreprocessor() when the preprocessor throws an exception.
278 public void testStartPreprocessorException() {
279 // arrange for the preprocessor to throw an exception
280 oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
282 verifyRun("testStartPreprocessorException", 1, 0, OperationResult.FAILURE_GUARD);
286 * Tests startPreprocessor() when the pipeline is not running.
289 public void testStartPreprocessorNotRunning() {
290 // arrange for the preprocessor to return success, which will be ignored
291 // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
293 oper.start().cancel(false);
294 assertTrue(executor.runAll(MAX_REQUESTS));
299 assertEquals(0, numStart);
300 assertEquals(0, oper.getCount());
301 assertEquals(0, numEnd);
305 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
308 public void testStartPreprocessorBuilderException() {
309 oper = new MyOper() {
311 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
312 throw new IllegalStateException(EXPECTED_EXCEPTION);
316 assertThatIllegalStateException().isThrownBy(() -> oper.start());
318 // should be nothing in the queue
319 assertEquals(0, executor.getQueueLength());
323 public void testStartPreprocessorAsync() {
324 assertNull(oper.startPreprocessorAsync());
328 public void testStartGuardAsync() throws Exception {
329 CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
330 assertTrue(future.isDone());
331 assertEquals(OperationResult.SUCCESS, future.get().getResult());
333 // verify the parameters that were passed
334 ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
335 ArgumentCaptor.forClass(ControlLoopOperationParams.class);
336 verify(guardOperator).buildOperation(paramsCaptor.capture());
338 params = paramsCaptor.getValue();
339 assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
340 assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
341 assertNull(params.getRetry());
342 assertNull(params.getTimeoutSec());
344 Map<String, Object> payload = params.getPayload();
345 assertNotNull(payload);
347 assertEquals(oper.makeGuardPayload(), payload);
351 * Tests startGuardAsync() when preprocessing is disabled.
354 public void testStartGuardAsyncDisabled() {
355 params = params.toBuilder().preprocessed(true).build();
356 assertNull(new MyOper().startGuardAsync());
360 public void testMakeGuardPayload() {
361 Map<String, Object> payload = oper.makeGuardPayload();
362 assertSame(REQ_ID, payload.get("requestId"));
364 // request id changes, so remove it
365 payload.remove("requestId");
367 assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
369 // repeat, but with closed loop name
370 event.setClosedLoopControlName("my-loop");
371 payload = oper.makeGuardPayload();
372 payload.remove("requestId");
373 assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
377 public void testStartOperationAsync() {
379 assertTrue(executor.runAll(MAX_REQUESTS));
381 assertEquals(1, oper.getCount());
385 public void testIsSuccess() {
386 assertFalse(oper.isSuccess(null));
388 OperationOutcome outcome = new OperationOutcome();
390 outcome.setResult(OperationResult.SUCCESS);
391 assertTrue(oper.isSuccess(outcome));
393 for (OperationResult failure : FAILURE_RESULTS) {
394 outcome.setResult(failure);
395 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
400 public void testIsActorFailed() {
401 assertFalse(oper.isActorFailed(null));
403 OperationOutcome outcome = params.makeOutcome(null);
406 outcome.setResult(OperationResult.SUCCESS);
407 assertFalse(oper.isActorFailed(outcome));
409 outcome.setResult(OperationResult.FAILURE_RETRIES);
410 assertFalse(oper.isActorFailed(outcome));
413 outcome.setResult(OperationResult.FAILURE);
416 outcome.setActor(MY_SINK);
417 assertFalse(oper.isActorFailed(outcome));
418 outcome.setActor(null);
419 assertFalse(oper.isActorFailed(outcome));
420 outcome.setActor(ACTOR);
422 // incorrect operation
423 outcome.setOperation(MY_SINK);
424 assertFalse(oper.isActorFailed(outcome));
425 outcome.setOperation(null);
426 assertFalse(oper.isActorFailed(outcome));
427 outcome.setOperation(OPERATION);
430 assertTrue(oper.isActorFailed(outcome));
434 public void testDoOperation() {
436 * Use an operation that doesn't override doOperation().
438 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
441 assertTrue(executor.runAll(MAX_REQUESTS));
443 assertNotNull(opend);
444 assertEquals(OperationResult.FAILURE_EXCEPTION, opend.getResult());
448 public void testTimeout() throws Exception {
450 // use a real executor
451 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
453 // trigger timeout very quickly
454 oper = new MyOper() {
456 protected long getTimeoutMs(Integer timeoutSec) {
461 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
463 OperationOutcome outcome2 = params.makeOutcome(null);
464 outcome2.setResult(OperationResult.SUCCESS);
467 * Create an incomplete future that will timeout after the operation's
468 * timeout. If it fires before the other timer, then it will return a
471 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
472 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
473 params.getExecutor());
479 assertEquals(OperationResult.FAILURE_TIMEOUT, oper.start().get().getResult());
483 * Tests retry functions, when the count is set to zero and retries are exhausted.
486 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
487 params = params.toBuilder().retry(0).build();
489 // new params, thus need a new operation
492 oper.setMaxFailures(10);
494 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, OperationResult.FAILURE);
498 * Tests retry functions, when the count is null and retries are exhausted.
501 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
502 params = params.toBuilder().retry(null).build();
504 // new params, thus need a new operation
507 oper.setMaxFailures(10);
509 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, OperationResult.FAILURE);
513 * Tests retry functions, when retries are exhausted.
516 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
517 final int maxRetries = 3;
518 params = params.toBuilder().retry(maxRetries).build();
520 // new params, thus need a new operation
523 oper.setMaxFailures(10);
525 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
526 OperationResult.FAILURE_RETRIES);
530 * Tests retry functions, when a success follows some retries.
533 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
534 params = params.toBuilder().retry(10).build();
536 // new params, thus need a new operation
539 final int maxFailures = 3;
540 oper.setMaxFailures(maxFailures);
542 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
543 OperationResult.SUCCESS);
547 * Tests retry functions, when the outcome is {@code null}.
550 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
552 // arrange to return null from doOperation()
553 oper = new MyOper() {
555 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
558 super.doOperation(attempt, outcome);
563 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, OperationResult.FAILURE, noop());
567 public void testSleep() throws Exception {
568 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
569 assertTrue(future.isDone());
570 assertNull(future.get());
573 future = oper.sleep(0, TimeUnit.SECONDS);
574 assertTrue(future.isDone());
575 assertNull(future.get());
578 * Start a second sleep we can use to check the first while it's running.
580 tstart = Instant.now();
581 future = oper.sleep(100, TimeUnit.MILLISECONDS);
583 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
585 // wait for second to complete and verify that the first has not completed
587 assertFalse(future.isDone());
589 // wait for second to complete
592 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
593 assertTrue(diff >= 99);
597 public void testIsSameOperation() {
598 assertFalse(oper.isSameOperation(null));
600 OperationOutcome outcome = params.makeOutcome(null);
602 // wrong actor - should be false
603 outcome.setActor(null);
604 assertFalse(oper.isSameOperation(outcome));
605 outcome.setActor(MY_SINK);
606 assertFalse(oper.isSameOperation(outcome));
607 outcome.setActor(ACTOR);
609 // wrong operation - should be null
610 outcome.setOperation(null);
611 assertFalse(oper.isSameOperation(outcome));
612 outcome.setOperation(MY_SINK);
613 assertFalse(oper.isSameOperation(outcome));
614 outcome.setOperation(OPERATION);
616 assertTrue(oper.isSameOperation(outcome));
620 * Tests handleFailure() when the outcome is a success.
623 public void testHandlePreprocessorFailureSuccess() {
624 oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
625 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, OperationResult.SUCCESS);
629 * Tests handleFailure() when the outcome is <i>not</i> a success.
632 public void testHandlePreprocessorFailureFailed() throws Exception {
633 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
634 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, OperationResult.FAILURE_GUARD);
638 * Tests handleFailure() when the outcome is {@code null}.
641 public void testHandlePreprocessorFailureNull() throws Exception {
642 // arrange to return a null outcome from the preprocessor
643 oper.setPreProc(CompletableFuture.completedFuture(null));
644 verifyRun("testHandlePreprocessorFailureNull", 1, 0, OperationResult.FAILURE_GUARD);
648 public void testFromException() {
649 // arrange to generate an exception when operation runs
650 oper.setGenException(true);
652 verifyRun("testFromException", 1, 1, OperationResult.FAILURE_EXCEPTION);
656 * Tests fromException() when there is no exception.
659 public void testFromExceptionNoExcept() {
660 verifyRun("testFromExceptionNoExcept", 1, 1, OperationResult.SUCCESS);
664 * Tests both flavors of anyOf(), because one invokes the other.
667 public void testAnyOf() throws Exception {
668 // first task completes, others do not
669 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
671 final OperationOutcome outcome = params.makeOutcome(null);
673 tasks.add(() -> CompletableFuture.completedFuture(outcome));
674 tasks.add(() -> new CompletableFuture<>());
675 tasks.add(() -> null);
676 tasks.add(() -> new CompletableFuture<>());
678 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
679 assertTrue(executor.runAll(MAX_REQUESTS));
680 assertTrue(result.isDone());
681 assertSame(outcome, result.get());
683 // repeat using array form
684 @SuppressWarnings("unchecked")
685 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
686 result = oper.anyOf(tasks.toArray(taskArray));
687 assertTrue(executor.runAll(MAX_REQUESTS));
688 assertTrue(result.isDone());
689 assertSame(outcome, result.get());
691 // second task completes, others do not
693 tasks.add(() -> new CompletableFuture<>());
694 tasks.add(() -> CompletableFuture.completedFuture(outcome));
695 tasks.add(() -> new CompletableFuture<>());
697 result = oper.anyOf(tasks);
698 assertTrue(executor.runAll(MAX_REQUESTS));
699 assertTrue(result.isDone());
700 assertSame(outcome, result.get());
702 // third task completes, others do not
704 tasks.add(() -> new CompletableFuture<>());
705 tasks.add(() -> new CompletableFuture<>());
706 tasks.add(() -> CompletableFuture.completedFuture(outcome));
708 result = oper.anyOf(tasks);
709 assertTrue(executor.runAll(MAX_REQUESTS));
710 assertTrue(result.isDone());
711 assertSame(outcome, result.get());
715 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
718 @SuppressWarnings("unchecked")
719 public void testAnyOfEdge() throws Exception {
720 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
722 // zero items: check both using a list and using an array
723 assertNull(oper.anyOf(tasks));
724 assertNull(oper.anyOf());
726 // one item: : check both using a list and using an array
727 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
728 tasks.add(() -> future1);
730 assertSame(future1, oper.anyOf(tasks));
731 assertSame(future1, oper.anyOf(() -> future1));
735 public void testAllOfArray() throws Exception {
736 final OperationOutcome outcome = params.makeOutcome(null);
738 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
739 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
740 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
742 @SuppressWarnings("unchecked")
743 CompletableFuture<OperationOutcome> result =
744 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
746 assertTrue(executor.runAll(MAX_REQUESTS));
747 assertFalse(result.isDone());
748 future1.complete(outcome);
750 // complete 3 before 2
751 assertTrue(executor.runAll(MAX_REQUESTS));
752 assertFalse(result.isDone());
753 future3.complete(outcome);
755 assertTrue(executor.runAll(MAX_REQUESTS));
756 assertFalse(result.isDone());
757 future2.complete(outcome);
759 // all of them are now done
760 assertTrue(executor.runAll(MAX_REQUESTS));
761 assertTrue(result.isDone());
762 assertSame(outcome, result.get());
766 public void testAllOfList() throws Exception {
767 final OperationOutcome outcome = params.makeOutcome(null);
769 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
770 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
771 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
773 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
774 tasks.add(() -> future1);
775 tasks.add(() -> future2);
776 tasks.add(() -> null);
777 tasks.add(() -> future3);
779 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
781 assertTrue(executor.runAll(MAX_REQUESTS));
782 assertFalse(result.isDone());
783 future1.complete(outcome);
785 // complete 3 before 2
786 assertTrue(executor.runAll(MAX_REQUESTS));
787 assertFalse(result.isDone());
788 future3.complete(outcome);
790 assertTrue(executor.runAll(MAX_REQUESTS));
791 assertFalse(result.isDone());
792 future2.complete(outcome);
794 // all of them are now done
795 assertTrue(executor.runAll(MAX_REQUESTS));
796 assertTrue(result.isDone());
797 assertSame(outcome, result.get());
801 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
804 @SuppressWarnings("unchecked")
805 public void testAllOfEdge() throws Exception {
806 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
808 // zero items: check both using a list and using an array
809 assertNull(oper.allOf(tasks));
810 assertNull(oper.allOf());
812 // one item: : check both using a list and using an array
813 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
814 tasks.add(() -> future1);
816 assertSame(future1, oper.allOf(tasks));
817 assertSame(future1, oper.allOf(() -> future1));
821 public void testAttachFutures() throws Exception {
822 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
824 // third task throws an exception during construction
825 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
826 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
827 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
828 tasks.add(() -> future1);
829 tasks.add(() -> future2);
831 throw new IllegalStateException(EXPECTED_EXCEPTION);
833 tasks.add(() -> future3);
835 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
837 // should have canceled the first two, but not the last
838 assertTrue(future1.isCancelled());
839 assertTrue(future2.isCancelled());
840 assertFalse(future3.isCancelled());
844 public void testCombineOutcomes() throws Exception {
846 verifyOutcomes(0, OperationResult.SUCCESS);
847 verifyOutcomes(0, OperationResult.FAILURE_EXCEPTION);
849 // maximum is in different positions
850 verifyOutcomes(0, OperationResult.FAILURE, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD);
851 verifyOutcomes(1, OperationResult.SUCCESS, OperationResult.FAILURE, OperationResult.FAILURE_GUARD);
852 verifyOutcomes(2, OperationResult.SUCCESS, OperationResult.FAILURE_GUARD, OperationResult.FAILURE);
854 // null outcome - takes precedence over a success
855 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
856 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
857 tasks.add(() -> CompletableFuture.completedFuture(null));
858 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
859 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
861 assertTrue(executor.runAll(MAX_REQUESTS));
862 assertTrue(result.isDone());
863 assertNull(result.get());
865 // one throws an exception during execution
866 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
869 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
870 tasks.add(() -> CompletableFuture.failedFuture(except));
871 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome(null)));
872 result = oper.allOf(tasks);
874 assertTrue(executor.runAll(MAX_REQUESTS));
875 assertTrue(result.isCompletedExceptionally());
876 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
880 * Tests both flavors of sequence(), because one invokes the other.
883 public void testSequence() throws Exception {
884 final OperationOutcome outcome = params.makeOutcome(null);
886 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
887 tasks.add(() -> CompletableFuture.completedFuture(outcome));
888 tasks.add(() -> null);
889 tasks.add(() -> CompletableFuture.completedFuture(outcome));
890 tasks.add(() -> CompletableFuture.completedFuture(outcome));
892 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
893 assertTrue(executor.runAll(MAX_REQUESTS));
894 assertTrue(result.isDone());
895 assertSame(outcome, result.get());
897 // repeat using array form
898 @SuppressWarnings("unchecked")
899 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
900 result = oper.sequence(tasks.toArray(taskArray));
901 assertTrue(executor.runAll(MAX_REQUESTS));
902 assertTrue(result.isDone());
903 assertSame(outcome, result.get());
905 // second task fails, third should not run
906 OperationOutcome failure = params.makeOutcome(null);
907 failure.setResult(OperationResult.FAILURE);
909 tasks.add(() -> CompletableFuture.completedFuture(outcome));
910 tasks.add(() -> CompletableFuture.completedFuture(failure));
911 tasks.add(() -> CompletableFuture.completedFuture(outcome));
913 result = oper.sequence(tasks);
914 assertTrue(executor.runAll(MAX_REQUESTS));
915 assertTrue(result.isDone());
916 assertSame(failure, result.get());
920 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
923 @SuppressWarnings("unchecked")
924 public void testSequenceEdge() throws Exception {
925 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
927 // zero items: check both using a list and using an array
928 assertNull(oper.sequence(tasks));
929 assertNull(oper.sequence());
931 // one item: : check both using a list and using an array
932 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
933 tasks.add(() -> future1);
935 assertSame(future1, oper.sequence(tasks));
936 assertSame(future1, oper.sequence(() -> future1));
939 private void verifyOutcomes(int expected, OperationResult... results) throws Exception {
940 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
942 OperationOutcome expectedOutcome = null;
944 for (int count = 0; count < results.length; ++count) {
945 OperationOutcome outcome = params.makeOutcome(null);
946 outcome.setResult(results[count]);
947 tasks.add(() -> CompletableFuture.completedFuture(outcome));
949 if (count == expected) {
950 expectedOutcome = outcome;
954 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
956 assertTrue(executor.runAll(MAX_REQUESTS));
957 assertTrue(result.isDone());
958 assertSame(expectedOutcome, result.get());
962 public void testDetmPriority() throws CoderException {
963 assertEquals(1, oper.detmPriority(null));
965 OperationOutcome outcome = params.makeOutcome(null);
967 Map<OperationResult, Integer> map = Map.of(OperationResult.SUCCESS, 0, OperationResult.FAILURE_GUARD, 2,
968 OperationResult.FAILURE_RETRIES, 3, OperationResult.FAILURE, 4, OperationResult.FAILURE_TIMEOUT, 5,
969 OperationResult.FAILURE_EXCEPTION, 6);
971 for (Entry<OperationResult, Integer> ent : map.entrySet()) {
972 outcome.setResult(ent.getKey());
973 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
977 * Test null result. We can't actually set it to null, because the set() method
978 * won't allow it. Instead, we decode it from a structure.
980 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
981 assertEquals(1, oper.detmPriority(outcome));
985 * Tests callbackStarted() when the pipeline has already been stopped.
988 public void testCallbackStartedNotRunning() {
989 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
992 * arrange to stop the controller when the start-callback is invoked, but capture
995 params = params.toBuilder().startCallback(oper -> {
997 future.get().cancel(false);
1000 // new params, thus need a new operation
1001 oper = new MyOper();
1003 future.set(oper.start());
1004 assertTrue(executor.runAll(MAX_REQUESTS));
1006 // should have only run once
1007 assertEquals(1, numStart);
1011 * Tests callbackCompleted() when the pipeline has already been stopped.
1014 public void testCallbackCompletedNotRunning() {
1015 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1017 // arrange to stop the controller when the start-callback is invoked
1018 params = params.toBuilder().startCallback(oper -> {
1019 future.get().cancel(false);
1022 // new params, thus need a new operation
1023 oper = new MyOper();
1025 future.set(oper.start());
1026 assertTrue(executor.runAll(MAX_REQUESTS));
1028 // should not have been set
1030 assertEquals(0, numEnd);
1034 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1035 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1037 OperationOutcome outcome;
1039 outcome = new OperationOutcome();
1040 oper.setOutcome(outcome, timex);
1041 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1042 assertEquals(OperationResult.FAILURE_TIMEOUT, outcome.getResult());
1044 outcome = new OperationOutcome();
1045 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1046 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1047 assertEquals(OperationResult.FAILURE_EXCEPTION, outcome.getResult());
1051 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1052 OperationOutcome outcome;
1054 outcome = new OperationOutcome();
1055 oper.setOutcome(outcome, OperationResult.SUCCESS);
1056 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1057 assertEquals(OperationResult.SUCCESS, outcome.getResult());
1059 oper.setOutcome(outcome, OperationResult.SUCCESS);
1060 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1061 assertEquals(OperationResult.SUCCESS, outcome.getResult());
1063 for (OperationResult result : FAILURE_RESULTS) {
1064 outcome = new OperationOutcome();
1065 oper.setOutcome(outcome, result);
1066 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1067 assertEquals(result.toString(), result, outcome.getResult());
1072 public void testIsTimeout() {
1073 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1075 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1076 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1077 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1078 assertFalse(oper.isTimeout(new CompletionException(null)));
1079 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1081 assertTrue(oper.isTimeout(timex));
1082 assertTrue(oper.isTimeout(new CompletionException(timex)));
1086 public void testLogMessage() {
1087 final String infraStr = SINK_INFRA.toString();
1089 // log structured data
1090 appender.clearExtractions();
1091 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1092 List<String> output = appender.getExtracted();
1093 assertEquals(1, output.size());
1095 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1096 .contains("{\n \"text\": \"my-text\"\n}");
1098 // repeat with a response
1099 appender.clearExtractions();
1100 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1101 output = appender.getExtracted();
1102 assertEquals(1, output.size());
1104 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1105 .contains("{\n \"text\": \"my-text\"\n}");
1107 // log a plain string
1108 appender.clearExtractions();
1109 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1110 output = appender.getExtracted();
1111 assertEquals(1, output.size());
1112 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1114 // log a null request
1115 appender.clearExtractions();
1116 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1117 output = appender.getExtracted();
1118 assertEquals(1, output.size());
1120 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1122 // generate exception from coder
1123 setOperCoderException();
1125 appender.clearExtractions();
1126 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1127 output = appender.getExtracted();
1128 assertEquals(2, output.size());
1129 assertThat(output.get(0)).contains("cannot pretty-print request");
1130 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1132 // repeat with a response
1133 appender.clearExtractions();
1134 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1135 output = appender.getExtracted();
1136 assertEquals(2, output.size());
1137 assertThat(output.get(0)).contains("cannot pretty-print response");
1138 assertThat(output.get(1)).contains(MY_SOURCE);
1142 public void testGetRetry() {
1143 assertEquals(0, oper.getRetry(null));
1144 assertEquals(10, oper.getRetry(10));
1148 public void testGetRetryWait() {
1149 // need an operator that doesn't override the retry time
1150 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1151 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1155 public void testGetTargetEntity() {
1156 // get it from the params
1157 assertEquals(MY_TARGET_ENTITY, oper.getTargetEntity());
1159 // now get it from the properties
1160 oper.setProperty(OperationProperties.AAI_TARGET_ENTITY, "entityX");
1161 assertEquals("entityX", oper.getTargetEntity());
1165 public void testGetTimeOutMs() {
1166 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1168 params = params.toBuilder().timeoutSec(null).build();
1170 // new params, thus need a new operation
1171 oper = new MyOper();
1173 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1176 private void starter(OperationOutcome oper) {
1178 tstart = oper.getStart();
1183 private void completer(OperationOutcome oper) {
1190 * Gets a function that does nothing.
1192 * @param <T> type of input parameter expected by the function
1193 * @return a function that does nothing
1195 private <T> Consumer<T> noop() {
1200 private OperationOutcome makeSuccess() {
1201 OperationOutcome outcome = params.makeOutcome(null);
1202 outcome.setResult(OperationResult.SUCCESS);
1207 private OperationOutcome makeFailure() {
1208 OperationOutcome outcome = params.makeOutcome(null);
1209 outcome.setResult(OperationResult.FAILURE);
1217 * @param testName test name
1218 * @param expectedCallbacks number of callbacks expected
1219 * @param expectedOperations number of operation invocations expected
1220 * @param expectedResult expected outcome
1222 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1223 OperationResult expectedResult) {
1225 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1231 * @param testName test name
1232 * @param expectedCallbacks number of callbacks expected
1233 * @param expectedOperations number of operation invocations expected
1234 * @param expectedResult expected outcome
1235 * @param manipulator function to modify the future returned by
1236 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1237 * in the executor are run
1239 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1240 OperationResult expectedResult, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1248 CompletableFuture<OperationOutcome> future = oper.start();
1250 manipulator.accept(future);
1252 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1254 assertEquals(testName, expectedCallbacks, numStart);
1255 assertEquals(testName, expectedCallbacks, numEnd);
1257 if (expectedCallbacks > 0) {
1258 assertNotNull(testName, opstart);
1259 assertNotNull(testName, opend);
1260 assertEquals(testName, expectedResult, opend.getResult());
1262 assertSame(testName, tstart, opstart.getStart());
1263 assertSame(testName, tstart, opend.getStart());
1266 assertTrue(future.isDone());
1267 assertEquals(testName, opend, future.get());
1269 // "start" is never final
1270 for (OperationOutcome outcome : starts) {
1271 assertFalse(testName, outcome.isFinalOutcome());
1274 // only the last "complete" is final
1275 assertTrue(testName, ends.removeLast().isFinalOutcome());
1277 for (OperationOutcome outcome : ends) {
1278 assertFalse(outcome.isFinalOutcome());
1281 } catch (InterruptedException | ExecutionException e) {
1282 throw new IllegalStateException(e);
1285 if (expectedOperations > 0) {
1286 assertNotNull(testName, oper.getSubRequestId());
1287 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1288 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1292 assertEquals(testName, expectedOperations, oper.getCount());
1296 * Creates a new {@link #oper} whose coder will throw an exception.
1298 private void setOperCoderException() {
1299 oper = new MyOper() {
1301 protected Coder getCoder() {
1302 return new StandardCoder() {
1304 public String encode(Object object, boolean pretty) throws CoderException {
1305 throw new CoderException(EXPECTED_EXCEPTION);
1314 public static class MyData {
1315 private String text = TEXT;
1319 private class MyOper extends OperationPartial {
1321 private int count = 0;
1324 private boolean genException;
1326 private int maxFailures = 0;
1328 private CompletableFuture<OperationOutcome> preProc;
1332 super(OperationPartialTest.this.params, config, PROP_NAMES);
1336 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1339 throw new IllegalStateException(EXPECTED_EXCEPTION);
1342 operation.setSubRequestId(String.valueOf(attempt));
1344 if (count > maxFailures) {
1345 operation.setResult(OperationResult.SUCCESS);
1347 operation.setResult(OperationResult.FAILURE);
1354 protected long getRetryWaitMs() {
1356 * Sleep timers run in the background, but we want to control things via the
1357 * "executor", thus we avoid sleep timers altogether by simply returning 0.
1363 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1364 return (preProc != null ? preProc : super.startPreprocessorAsync());