2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Collections;
40 import java.util.Deque;
41 import java.util.LinkedList;
42 import java.util.List;
44 import java.util.Map.Entry;
45 import java.util.UUID;
46 import java.util.concurrent.CompletableFuture;
47 import java.util.concurrent.CompletionException;
48 import java.util.concurrent.ExecutionException;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.Future;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicReference;
54 import java.util.function.Consumer;
55 import java.util.function.Supplier;
56 import java.util.stream.Collectors;
59 import org.junit.AfterClass;
60 import org.junit.Before;
61 import org.junit.BeforeClass;
62 import org.junit.Test;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.MockitoAnnotations;
66 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
67 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
68 import org.onap.policy.common.utils.coder.Coder;
69 import org.onap.policy.common.utils.coder.CoderException;
70 import org.onap.policy.common.utils.coder.StandardCoder;
71 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
72 import org.onap.policy.common.utils.time.PseudoExecutor;
73 import org.onap.policy.controlloop.ControlLoopOperation;
74 import org.onap.policy.controlloop.VirtualControlLoopEvent;
75 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
76 import org.onap.policy.controlloop.actorserviceprovider.Operation;
77 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
78 import org.onap.policy.controlloop.actorserviceprovider.Operator;
79 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
81 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
82 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
83 import org.onap.policy.controlloop.policy.PolicyResult;
84 import org.slf4j.LoggerFactory;
86 public class OperationPartialTest {
87 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
88 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
89 private static final int MAX_REQUESTS = 100;
90 private static final int MAX_PARALLEL = 10;
91 private static final String EXPECTED_EXCEPTION = "expected exception";
92 private static final String ACTOR = "my-actor";
93 private static final String OPERATION = "my-operation";
94 private static final String MY_SINK = "my-sink";
95 private static final String MY_SOURCE = "my-source";
96 private static final String MY_TARGET_ENTITY = "my-entity";
97 private static final String TEXT = "my-text";
98 private static final int TIMEOUT = 1000;
99 private static final UUID REQ_ID = UUID.randomUUID();
101 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
102 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
105 * Used to attach an appender to the class' logger.
107 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
108 private static final ExtractAppender appender = new ExtractAppender();
110 private static final List<String> PROP_NAMES = List.of("hello", "world");
113 private ActorService service;
115 private Actor guardActor;
117 private Operator guardOperator;
119 private Operation guardOperation;
121 private VirtualControlLoopEvent event;
122 private ControlLoopEventContext context;
123 private PseudoExecutor executor;
124 private ControlLoopOperationParams params;
128 private int numStart;
131 private Instant tstart;
133 private OperationOutcome opstart;
134 private OperationOutcome opend;
136 private Deque<OperationOutcome> starts;
137 private Deque<OperationOutcome> ends;
139 private OperatorConfig config;
142 * Attaches the appender to the logger.
145 public static void setUpBeforeClass() throws Exception {
147 * Attach appender to the logger.
149 appender.setContext(logger.getLoggerContext());
152 logger.addAppender(appender);
156 * Stops the appender.
159 public static void tearDownAfterClass() {
164 * Initializes the fields, including {@link #oper}.
167 public void setUp() {
168 MockitoAnnotations.initMocks(this);
170 event = new VirtualControlLoopEvent();
171 event.setRequestId(REQ_ID);
173 context = new ControlLoopEventContext(event);
174 executor = new PseudoExecutor();
176 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
177 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
178 .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
180 when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
181 when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
182 when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
183 when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
185 config = new OperatorConfig(executor);
194 starts = new ArrayDeque<>(10);
195 ends = new ArrayDeque<>(10);
199 public void testOperatorPartial_testGetActorName_testGetName() {
200 assertEquals(ACTOR, oper.getActorName());
201 assertEquals(OPERATION, oper.getName());
202 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
206 public void testGetBlockingThread() throws Exception {
207 CompletableFuture<Void> future = new CompletableFuture<>();
209 // use the real executor
210 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
212 public Operation buildOperation(ControlLoopOperationParams params) {
217 oper2.getBlockingExecutor().execute(() -> future.complete(null));
219 assertNull(future.get(5, TimeUnit.SECONDS));
223 public void testGetPropertyNames() {
224 assertThat(oper.getPropertyNames()).isEqualTo(PROP_NAMES);
228 public void testGetProperty_testSetProperty() {
229 oper.setProperty("propertyA", "valueA");
230 oper.setProperty("propertyB", "valueB");
231 oper.setProperty("propertyC", 20);
233 assertEquals("valueA", oper.getProperty("propertyA"));
234 assertEquals("valueB", oper.getProperty("propertyB"));
235 assertEquals(Integer.valueOf(20), oper.getProperty("propertyC"));
239 public void testStart() {
240 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
244 * Tests start() with multiple running requests.
247 public void testStartMultiple() {
248 for (int count = 0; count < MAX_PARALLEL; ++count) {
252 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
254 assertNotNull(opstart);
255 assertNotNull(opend);
256 assertEquals(PolicyResult.SUCCESS, opend.getResult());
258 assertEquals(MAX_PARALLEL, numStart);
259 assertEquals(MAX_PARALLEL, oper.getCount());
260 assertEquals(MAX_PARALLEL, numEnd);
264 * Tests startPreprocessor() when the preprocessor returns a failure.
267 public void testStartPreprocessorFailure() {
268 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
270 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
274 * Tests startPreprocessor() when the preprocessor throws an exception.
277 public void testStartPreprocessorException() {
278 // arrange for the preprocessor to throw an exception
279 oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
281 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
285 * Tests startPreprocessor() when the pipeline is not running.
288 public void testStartPreprocessorNotRunning() {
289 // arrange for the preprocessor to return success, which will be ignored
290 // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
292 oper.start().cancel(false);
293 assertTrue(executor.runAll(MAX_REQUESTS));
298 assertEquals(0, numStart);
299 assertEquals(0, oper.getCount());
300 assertEquals(0, numEnd);
304 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
307 public void testStartPreprocessorBuilderException() {
308 oper = new MyOper() {
310 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
311 throw new IllegalStateException(EXPECTED_EXCEPTION);
315 assertThatIllegalStateException().isThrownBy(() -> oper.start());
317 // should be nothing in the queue
318 assertEquals(0, executor.getQueueLength());
322 public void testStartPreprocessorAsync() {
323 assertNull(oper.startPreprocessorAsync());
327 public void testStartGuardAsync() throws Exception {
328 CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
329 assertTrue(future.isDone());
330 assertEquals(PolicyResult.SUCCESS, future.get().getResult());
332 // verify the parameters that were passed
333 ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
334 ArgumentCaptor.forClass(ControlLoopOperationParams.class);
335 verify(guardOperator).buildOperation(paramsCaptor.capture());
337 params = paramsCaptor.getValue();
338 assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
339 assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
340 assertNull(params.getRetry());
341 assertNull(params.getTimeoutSec());
343 Map<String, Object> payload = params.getPayload();
344 assertNotNull(payload);
346 assertEquals(oper.makeGuardPayload(), payload);
350 * Tests startGuardAsync() when preprocessing is disabled.
353 public void testStartGuardAsyncDisabled() {
354 params = params.toBuilder().preprocessed(true).build();
355 assertNull(new MyOper().startGuardAsync());
359 public void testMakeGuardPayload() {
360 Map<String, Object> payload = oper.makeGuardPayload();
361 assertSame(REQ_ID, payload.get("requestId"));
363 // request id changes, so remove it
364 payload.remove("requestId");
366 assertEquals("{actor=my-actor, operation=my-operation, target=my-entity}", payload.toString());
368 // repeat, but with closed loop name
369 event.setClosedLoopControlName("my-loop");
370 payload = oper.makeGuardPayload();
371 payload.remove("requestId");
372 assertEquals("{actor=my-actor, operation=my-operation, target=my-entity, clname=my-loop}", payload.toString());
376 public void testStartOperationAsync() {
378 assertTrue(executor.runAll(MAX_REQUESTS));
380 assertEquals(1, oper.getCount());
384 public void testIsSuccess() {
385 assertFalse(oper.isSuccess(null));
387 OperationOutcome outcome = new OperationOutcome();
389 outcome.setResult(PolicyResult.SUCCESS);
390 assertTrue(oper.isSuccess(outcome));
392 for (PolicyResult failure : FAILURE_RESULTS) {
393 outcome.setResult(failure);
394 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
399 public void testIsActorFailed() {
400 assertFalse(oper.isActorFailed(null));
402 OperationOutcome outcome = params.makeOutcome();
405 outcome.setResult(PolicyResult.SUCCESS);
406 assertFalse(oper.isActorFailed(outcome));
408 outcome.setResult(PolicyResult.FAILURE_RETRIES);
409 assertFalse(oper.isActorFailed(outcome));
412 outcome.setResult(PolicyResult.FAILURE);
415 outcome.setActor(MY_SINK);
416 assertFalse(oper.isActorFailed(outcome));
417 outcome.setActor(null);
418 assertFalse(oper.isActorFailed(outcome));
419 outcome.setActor(ACTOR);
421 // incorrect operation
422 outcome.setOperation(MY_SINK);
423 assertFalse(oper.isActorFailed(outcome));
424 outcome.setOperation(null);
425 assertFalse(oper.isActorFailed(outcome));
426 outcome.setOperation(OPERATION);
429 assertTrue(oper.isActorFailed(outcome));
433 public void testDoOperation() {
435 * Use an operation that doesn't override doOperation().
437 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
440 assertTrue(executor.runAll(MAX_REQUESTS));
442 assertNotNull(opend);
443 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
447 public void testTimeout() throws Exception {
449 // use a real executor
450 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
452 // trigger timeout very quickly
453 oper = new MyOper() {
455 protected long getTimeoutMs(Integer timeoutSec) {
460 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
462 OperationOutcome outcome2 = params.makeOutcome();
463 outcome2.setResult(PolicyResult.SUCCESS);
466 * Create an incomplete future that will timeout after the operation's
467 * timeout. If it fires before the other timer, then it will return a
470 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
471 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
472 params.getExecutor());
478 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
482 * Tests retry functions, when the count is set to zero and retries are exhausted.
485 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
486 params = params.toBuilder().retry(0).build();
488 // new params, thus need a new operation
491 oper.setMaxFailures(10);
493 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
497 * Tests retry functions, when the count is null and retries are exhausted.
500 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
501 params = params.toBuilder().retry(null).build();
503 // new params, thus need a new operation
506 oper.setMaxFailures(10);
508 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
512 * Tests retry functions, when retries are exhausted.
515 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
516 final int maxRetries = 3;
517 params = params.toBuilder().retry(maxRetries).build();
519 // new params, thus need a new operation
522 oper.setMaxFailures(10);
524 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
525 PolicyResult.FAILURE_RETRIES);
529 * Tests retry functions, when a success follows some retries.
532 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
533 params = params.toBuilder().retry(10).build();
535 // new params, thus need a new operation
538 final int maxFailures = 3;
539 oper.setMaxFailures(maxFailures);
541 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
542 PolicyResult.SUCCESS);
546 * Tests retry functions, when the outcome is {@code null}.
549 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
551 // arrange to return null from doOperation()
552 oper = new MyOper() {
554 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
557 super.doOperation(attempt, outcome);
562 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, noop());
566 public void testSleep() throws Exception {
567 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
568 assertTrue(future.isDone());
569 assertNull(future.get());
572 future = oper.sleep(0, TimeUnit.SECONDS);
573 assertTrue(future.isDone());
574 assertNull(future.get());
577 * Start a second sleep we can use to check the first while it's running.
579 tstart = Instant.now();
580 future = oper.sleep(100, TimeUnit.MILLISECONDS);
582 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
584 // wait for second to complete and verify that the first has not completed
586 assertFalse(future.isDone());
588 // wait for second to complete
591 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
592 assertTrue(diff >= 99);
596 public void testIsSameOperation() {
597 assertFalse(oper.isSameOperation(null));
599 OperationOutcome outcome = params.makeOutcome();
601 // wrong actor - should be false
602 outcome.setActor(null);
603 assertFalse(oper.isSameOperation(outcome));
604 outcome.setActor(MY_SINK);
605 assertFalse(oper.isSameOperation(outcome));
606 outcome.setActor(ACTOR);
608 // wrong operation - should be null
609 outcome.setOperation(null);
610 assertFalse(oper.isSameOperation(outcome));
611 outcome.setOperation(MY_SINK);
612 assertFalse(oper.isSameOperation(outcome));
613 outcome.setOperation(OPERATION);
615 assertTrue(oper.isSameOperation(outcome));
619 * Tests handleFailure() when the outcome is a success.
622 public void testHandlePreprocessorFailureSuccess() {
623 oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
624 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
628 * Tests handleFailure() when the outcome is <i>not</i> a success.
631 public void testHandlePreprocessorFailureFailed() throws Exception {
632 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
633 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
637 * Tests handleFailure() when the outcome is {@code null}.
640 public void testHandlePreprocessorFailureNull() throws Exception {
641 // arrange to return a null outcome from the preprocessor
642 oper.setPreProc(CompletableFuture.completedFuture(null));
643 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
647 public void testFromException() {
648 // arrange to generate an exception when operation runs
649 oper.setGenException(true);
651 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
655 * Tests fromException() when there is no exception.
658 public void testFromExceptionNoExcept() {
659 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
663 * Tests both flavors of anyOf(), because one invokes the other.
666 public void testAnyOf() throws Exception {
667 // first task completes, others do not
668 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
670 final OperationOutcome outcome = params.makeOutcome();
672 tasks.add(() -> CompletableFuture.completedFuture(outcome));
673 tasks.add(() -> new CompletableFuture<>());
674 tasks.add(() -> null);
675 tasks.add(() -> new CompletableFuture<>());
677 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
678 assertTrue(executor.runAll(MAX_REQUESTS));
679 assertTrue(result.isDone());
680 assertSame(outcome, result.get());
682 // repeat using array form
683 @SuppressWarnings("unchecked")
684 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
685 result = oper.anyOf(tasks.toArray(taskArray));
686 assertTrue(executor.runAll(MAX_REQUESTS));
687 assertTrue(result.isDone());
688 assertSame(outcome, result.get());
690 // second task completes, others do not
692 tasks.add(() -> new CompletableFuture<>());
693 tasks.add(() -> CompletableFuture.completedFuture(outcome));
694 tasks.add(() -> new CompletableFuture<>());
696 result = oper.anyOf(tasks);
697 assertTrue(executor.runAll(MAX_REQUESTS));
698 assertTrue(result.isDone());
699 assertSame(outcome, result.get());
701 // third task completes, others do not
703 tasks.add(() -> new CompletableFuture<>());
704 tasks.add(() -> new CompletableFuture<>());
705 tasks.add(() -> CompletableFuture.completedFuture(outcome));
707 result = oper.anyOf(tasks);
708 assertTrue(executor.runAll(MAX_REQUESTS));
709 assertTrue(result.isDone());
710 assertSame(outcome, result.get());
714 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
717 @SuppressWarnings("unchecked")
718 public void testAnyOfEdge() throws Exception {
719 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
721 // zero items: check both using a list and using an array
722 assertNull(oper.anyOf(tasks));
723 assertNull(oper.anyOf());
725 // one item: : check both using a list and using an array
726 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
727 tasks.add(() -> future1);
729 assertSame(future1, oper.anyOf(tasks));
730 assertSame(future1, oper.anyOf(() -> future1));
734 public void testAllOfArray() throws Exception {
735 final OperationOutcome outcome = params.makeOutcome();
737 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
738 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
739 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
741 @SuppressWarnings("unchecked")
742 CompletableFuture<OperationOutcome> result =
743 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
745 assertTrue(executor.runAll(MAX_REQUESTS));
746 assertFalse(result.isDone());
747 future1.complete(outcome);
749 // complete 3 before 2
750 assertTrue(executor.runAll(MAX_REQUESTS));
751 assertFalse(result.isDone());
752 future3.complete(outcome);
754 assertTrue(executor.runAll(MAX_REQUESTS));
755 assertFalse(result.isDone());
756 future2.complete(outcome);
758 // all of them are now done
759 assertTrue(executor.runAll(MAX_REQUESTS));
760 assertTrue(result.isDone());
761 assertSame(outcome, result.get());
765 public void testAllOfList() throws Exception {
766 final OperationOutcome outcome = params.makeOutcome();
768 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
769 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
770 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
772 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
773 tasks.add(() -> future1);
774 tasks.add(() -> future2);
775 tasks.add(() -> null);
776 tasks.add(() -> future3);
778 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
780 assertTrue(executor.runAll(MAX_REQUESTS));
781 assertFalse(result.isDone());
782 future1.complete(outcome);
784 // complete 3 before 2
785 assertTrue(executor.runAll(MAX_REQUESTS));
786 assertFalse(result.isDone());
787 future3.complete(outcome);
789 assertTrue(executor.runAll(MAX_REQUESTS));
790 assertFalse(result.isDone());
791 future2.complete(outcome);
793 // all of them are now done
794 assertTrue(executor.runAll(MAX_REQUESTS));
795 assertTrue(result.isDone());
796 assertSame(outcome, result.get());
800 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
803 @SuppressWarnings("unchecked")
804 public void testAllOfEdge() throws Exception {
805 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
807 // zero items: check both using a list and using an array
808 assertNull(oper.allOf(tasks));
809 assertNull(oper.allOf());
811 // one item: : check both using a list and using an array
812 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
813 tasks.add(() -> future1);
815 assertSame(future1, oper.allOf(tasks));
816 assertSame(future1, oper.allOf(() -> future1));
820 public void testAttachFutures() throws Exception {
821 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
823 // third task throws an exception during construction
824 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
825 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
826 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
827 tasks.add(() -> future1);
828 tasks.add(() -> future2);
830 throw new IllegalStateException(EXPECTED_EXCEPTION);
832 tasks.add(() -> future3);
834 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
836 // should have canceled the first two, but not the last
837 assertTrue(future1.isCancelled());
838 assertTrue(future2.isCancelled());
839 assertFalse(future3.isCancelled());
843 public void testCombineOutcomes() throws Exception {
845 verifyOutcomes(0, PolicyResult.SUCCESS);
846 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
848 // maximum is in different positions
849 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
850 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
851 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
853 // null outcome - takes precedence over a success
854 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
855 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
856 tasks.add(() -> CompletableFuture.completedFuture(null));
857 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
858 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
860 assertTrue(executor.runAll(MAX_REQUESTS));
861 assertTrue(result.isDone());
862 assertNull(result.get());
864 // one throws an exception during execution
865 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
868 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
869 tasks.add(() -> CompletableFuture.failedFuture(except));
870 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
871 result = oper.allOf(tasks);
873 assertTrue(executor.runAll(MAX_REQUESTS));
874 assertTrue(result.isCompletedExceptionally());
875 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
879 * Tests both flavors of sequence(), because one invokes the other.
882 public void testSequence() throws Exception {
883 final OperationOutcome outcome = params.makeOutcome();
885 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
886 tasks.add(() -> CompletableFuture.completedFuture(outcome));
887 tasks.add(() -> null);
888 tasks.add(() -> CompletableFuture.completedFuture(outcome));
889 tasks.add(() -> CompletableFuture.completedFuture(outcome));
891 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
892 assertTrue(executor.runAll(MAX_REQUESTS));
893 assertTrue(result.isDone());
894 assertSame(outcome, result.get());
896 // repeat using array form
897 @SuppressWarnings("unchecked")
898 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
899 result = oper.sequence(tasks.toArray(taskArray));
900 assertTrue(executor.runAll(MAX_REQUESTS));
901 assertTrue(result.isDone());
902 assertSame(outcome, result.get());
904 // second task fails, third should not run
905 OperationOutcome failure = params.makeOutcome();
906 failure.setResult(PolicyResult.FAILURE);
908 tasks.add(() -> CompletableFuture.completedFuture(outcome));
909 tasks.add(() -> CompletableFuture.completedFuture(failure));
910 tasks.add(() -> CompletableFuture.completedFuture(outcome));
912 result = oper.sequence(tasks);
913 assertTrue(executor.runAll(MAX_REQUESTS));
914 assertTrue(result.isDone());
915 assertSame(failure, result.get());
919 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
922 @SuppressWarnings("unchecked")
923 public void testSequenceEdge() throws Exception {
924 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
926 // zero items: check both using a list and using an array
927 assertNull(oper.sequence(tasks));
928 assertNull(oper.sequence());
930 // one item: : check both using a list and using an array
931 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
932 tasks.add(() -> future1);
934 assertSame(future1, oper.sequence(tasks));
935 assertSame(future1, oper.sequence(() -> future1));
938 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
939 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
941 OperationOutcome expectedOutcome = null;
943 for (int count = 0; count < results.length; ++count) {
944 OperationOutcome outcome = params.makeOutcome();
945 outcome.setResult(results[count]);
946 tasks.add(() -> CompletableFuture.completedFuture(outcome));
948 if (count == expected) {
949 expectedOutcome = outcome;
953 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
955 assertTrue(executor.runAll(MAX_REQUESTS));
956 assertTrue(result.isDone());
957 assertSame(expectedOutcome, result.get());
961 public void testDetmPriority() throws CoderException {
962 assertEquals(1, oper.detmPriority(null));
964 OperationOutcome outcome = params.makeOutcome();
966 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
967 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
968 PolicyResult.FAILURE_EXCEPTION, 6);
970 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
971 outcome.setResult(ent.getKey());
972 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
976 * Test null result. We can't actually set it to null, because the set() method
977 * won't allow it. Instead, we decode it from a structure.
979 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
980 assertEquals(1, oper.detmPriority(outcome));
984 * Tests callbackStarted() when the pipeline has already been stopped.
987 public void testCallbackStartedNotRunning() {
988 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
991 * arrange to stop the controller when the start-callback is invoked, but capture
994 params = params.toBuilder().startCallback(oper -> {
996 future.get().cancel(false);
999 // new params, thus need a new operation
1000 oper = new MyOper();
1002 future.set(oper.start());
1003 assertTrue(executor.runAll(MAX_REQUESTS));
1005 // should have only run once
1006 assertEquals(1, numStart);
1010 * Tests callbackCompleted() when the pipeline has already been stopped.
1013 public void testCallbackCompletedNotRunning() {
1014 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
1016 // arrange to stop the controller when the start-callback is invoked
1017 params = params.toBuilder().startCallback(oper -> {
1018 future.get().cancel(false);
1021 // new params, thus need a new operation
1022 oper = new MyOper();
1024 future.set(oper.start());
1025 assertTrue(executor.runAll(MAX_REQUESTS));
1027 // should not have been set
1029 assertEquals(0, numEnd);
1033 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1034 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1036 OperationOutcome outcome;
1038 outcome = new OperationOutcome();
1039 oper.setOutcome(outcome, timex);
1040 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1041 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1043 outcome = new OperationOutcome();
1044 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1045 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1046 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1050 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1051 OperationOutcome outcome;
1053 outcome = new OperationOutcome();
1054 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1055 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1056 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1058 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1059 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1060 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1062 for (PolicyResult result : FAILURE_RESULTS) {
1063 outcome = new OperationOutcome();
1064 oper.setOutcome(outcome, result);
1065 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1066 assertEquals(result.toString(), result, outcome.getResult());
1071 public void testIsTimeout() {
1072 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1074 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1075 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1076 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1077 assertFalse(oper.isTimeout(new CompletionException(null)));
1078 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1080 assertTrue(oper.isTimeout(timex));
1081 assertTrue(oper.isTimeout(new CompletionException(timex)));
1085 public void testLogMessage() {
1086 final String infraStr = SINK_INFRA.toString();
1088 // log structured data
1089 appender.clearExtractions();
1090 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1091 List<String> output = appender.getExtracted();
1092 assertEquals(1, output.size());
1094 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1095 .contains("{\n \"text\": \"my-text\"\n}");
1097 // repeat with a response
1098 appender.clearExtractions();
1099 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1100 output = appender.getExtracted();
1101 assertEquals(1, output.size());
1103 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1104 .contains("{\n \"text\": \"my-text\"\n}");
1106 // log a plain string
1107 appender.clearExtractions();
1108 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1109 output = appender.getExtracted();
1110 assertEquals(1, output.size());
1111 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1113 // log a null request
1114 appender.clearExtractions();
1115 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1116 output = appender.getExtracted();
1117 assertEquals(1, output.size());
1119 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1121 // generate exception from coder
1122 setOperCoderException();
1124 appender.clearExtractions();
1125 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1126 output = appender.getExtracted();
1127 assertEquals(2, output.size());
1128 assertThat(output.get(0)).contains("cannot pretty-print request");
1129 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1131 // repeat with a response
1132 appender.clearExtractions();
1133 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1134 output = appender.getExtracted();
1135 assertEquals(2, output.size());
1136 assertThat(output.get(0)).contains("cannot pretty-print response");
1137 assertThat(output.get(1)).contains(MY_SOURCE);
1141 public void testGetRetry() {
1142 assertEquals(0, oper.getRetry(null));
1143 assertEquals(10, oper.getRetry(10));
1147 public void testGetRetryWait() {
1148 // need an operator that doesn't override the retry time
1149 OperationPartial oper2 = new OperationPartial(params, config, Collections.emptyList()) {};
1150 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1154 public void testGetTimeOutMs() {
1155 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1157 params = params.toBuilder().timeoutSec(null).build();
1159 // new params, thus need a new operation
1160 oper = new MyOper();
1162 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1165 private void starter(OperationOutcome oper) {
1167 tstart = oper.getStart();
1172 private void completer(OperationOutcome oper) {
1179 * Gets a function that does nothing.
1181 * @param <T> type of input parameter expected by the function
1182 * @return a function that does nothing
1184 private <T> Consumer<T> noop() {
1189 private OperationOutcome makeSuccess() {
1190 OperationOutcome outcome = params.makeOutcome();
1191 outcome.setResult(PolicyResult.SUCCESS);
1196 private OperationOutcome makeFailure() {
1197 OperationOutcome outcome = params.makeOutcome();
1198 outcome.setResult(PolicyResult.FAILURE);
1206 * @param testName test name
1207 * @param expectedCallbacks number of callbacks expected
1208 * @param expectedOperations number of operation invocations expected
1209 * @param expectedResult expected outcome
1211 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1212 PolicyResult expectedResult) {
1214 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, noop());
1220 * @param testName test name
1221 * @param expectedCallbacks number of callbacks expected
1222 * @param expectedOperations number of operation invocations expected
1223 * @param expectedResult expected outcome
1224 * @param manipulator function to modify the future returned by
1225 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1226 * in the executor are run
1228 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1229 Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1237 CompletableFuture<OperationOutcome> future = oper.start();
1239 manipulator.accept(future);
1241 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1243 assertEquals(testName, expectedCallbacks, numStart);
1244 assertEquals(testName, expectedCallbacks, numEnd);
1246 if (expectedCallbacks > 0) {
1247 assertNotNull(testName, opstart);
1248 assertNotNull(testName, opend);
1249 assertEquals(testName, expectedResult, opend.getResult());
1251 assertSame(testName, tstart, opstart.getStart());
1252 assertSame(testName, tstart, opend.getStart());
1255 assertTrue(future.isDone());
1256 assertEquals(testName, opend, future.get());
1258 // "start" is never final
1259 for (OperationOutcome outcome : starts) {
1260 assertFalse(testName, outcome.isFinalOutcome());
1263 // only the last "complete" is final
1264 assertTrue(testName, ends.removeLast().isFinalOutcome());
1266 for (OperationOutcome outcome : ends) {
1267 assertFalse(outcome.isFinalOutcome());
1270 } catch (InterruptedException | ExecutionException e) {
1271 throw new IllegalStateException(e);
1274 if (expectedOperations > 0) {
1275 assertNotNull(testName, oper.getSubRequestId());
1276 assertEquals(testName + " op start", oper.getSubRequestId(), opstart.getSubRequestId());
1277 assertEquals(testName + " op end", oper.getSubRequestId(), opend.getSubRequestId());
1281 assertEquals(testName, expectedOperations, oper.getCount());
1285 * Creates a new {@link #oper} whose coder will throw an exception.
1287 private void setOperCoderException() {
1288 oper = new MyOper() {
1290 protected Coder getCoder() {
1291 return new StandardCoder() {
1293 public String encode(Object object, boolean pretty) throws CoderException {
1294 throw new CoderException(EXPECTED_EXCEPTION);
1303 public static class MyData {
1304 private String text = TEXT;
1308 private class MyOper extends OperationPartial {
1310 private int count = 0;
1313 private boolean genException;
1315 private int maxFailures = 0;
1317 private CompletableFuture<OperationOutcome> preProc;
1321 super(OperationPartialTest.this.params, config, PROP_NAMES);
1325 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1328 throw new IllegalStateException(EXPECTED_EXCEPTION);
1331 operation.setSubRequestId(String.valueOf(attempt));
1333 if (count > maxFailures) {
1334 operation.setResult(PolicyResult.SUCCESS);
1336 operation.setResult(PolicyResult.FAILURE);
1343 protected long getRetryWaitMs() {
1345 * Sleep timers run in the background, but we want to control things via the
1346 * "executor", thus we avoid sleep timers altogether by simply returning 0.
1352 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1353 return (preProc != null ? preProc : super.startPreprocessorAsync());