2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.ArrayDeque;
38 import java.util.Arrays;
39 import java.util.Deque;
40 import java.util.LinkedList;
41 import java.util.List;
43 import java.util.Map.Entry;
44 import java.util.UUID;
45 import java.util.concurrent.CompletableFuture;
46 import java.util.concurrent.CompletionException;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.Future;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.atomic.AtomicReference;
53 import java.util.function.Consumer;
54 import java.util.function.Supplier;
55 import java.util.stream.Collectors;
58 import org.junit.AfterClass;
59 import org.junit.Before;
60 import org.junit.BeforeClass;
61 import org.junit.Test;
62 import org.mockito.ArgumentCaptor;
63 import org.mockito.Mock;
64 import org.mockito.MockitoAnnotations;
65 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
66 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
67 import org.onap.policy.common.utils.coder.Coder;
68 import org.onap.policy.common.utils.coder.CoderException;
69 import org.onap.policy.common.utils.coder.StandardCoder;
70 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
71 import org.onap.policy.common.utils.time.PseudoExecutor;
72 import org.onap.policy.controlloop.ControlLoopOperation;
73 import org.onap.policy.controlloop.VirtualControlLoopEvent;
74 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
75 import org.onap.policy.controlloop.actorserviceprovider.Operation;
76 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
77 import org.onap.policy.controlloop.actorserviceprovider.Operator;
78 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
79 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
80 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
81 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
82 import org.onap.policy.controlloop.policy.PolicyResult;
83 import org.slf4j.LoggerFactory;
85 public class OperationPartialTest {
86 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
87 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
88 private static final int MAX_REQUESTS = 100;
89 private static final int MAX_PARALLEL = 10;
90 private static final String EXPECTED_EXCEPTION = "expected exception";
91 private static final String ACTOR = "my-actor";
92 private static final String OPERATION = "my-operation";
93 private static final String MY_SINK = "my-sink";
94 private static final String MY_SOURCE = "my-source";
95 private static final String MY_TARGET_ENTITY = "my-entity";
96 private static final String TEXT = "my-text";
97 private static final int TIMEOUT = 1000;
98 private static final UUID REQ_ID = UUID.randomUUID();
100 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
101 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
104 * Used to attach an appender to the class' logger.
106 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
107 private static final ExtractAppender appender = new ExtractAppender();
110 private ActorService service;
112 private Actor guardActor;
114 private Operator guardOperator;
116 private Operation guardOperation;
118 private VirtualControlLoopEvent event;
119 private ControlLoopEventContext context;
120 private PseudoExecutor executor;
121 private ControlLoopOperationParams params;
125 private int numStart;
128 private Instant tstart;
130 private OperationOutcome opstart;
131 private OperationOutcome opend;
133 private Deque<OperationOutcome> starts;
134 private Deque<OperationOutcome> ends;
136 private OperatorConfig config;
139 * Attaches the appender to the logger.
142 public static void setUpBeforeClass() throws Exception {
144 * Attach appender to the logger.
146 appender.setContext(logger.getLoggerContext());
149 logger.addAppender(appender);
153 * Stops the appender.
156 public static void tearDownAfterClass() {
161 * Initializes the fields, including {@link #oper}.
164 public void setUp() {
165 MockitoAnnotations.initMocks(this);
167 event = new VirtualControlLoopEvent();
168 event.setRequestId(REQ_ID);
170 context = new ControlLoopEventContext(event);
171 executor = new PseudoExecutor();
173 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
174 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
175 .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
177 when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
178 when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
179 when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
180 when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
182 config = new OperatorConfig(executor);
191 starts = new ArrayDeque<>(10);
192 ends = new ArrayDeque<>(10);
196 public void testOperatorPartial_testGetActorName_testGetName() {
197 assertEquals(ACTOR, oper.getActorName());
198 assertEquals(OPERATION, oper.getName());
199 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
203 public void testGetBlockingThread() throws Exception {
204 CompletableFuture<Void> future = new CompletableFuture<>();
206 // use the real executor
207 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
209 public Operation buildOperation(ControlLoopOperationParams params) {
214 oper2.getBlockingExecutor().execute(() -> future.complete(null));
216 assertNull(future.get(5, TimeUnit.SECONDS));
220 public void testStart() {
221 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
225 * Tests start() with multiple running requests.
228 public void testStartMultiple() {
229 for (int count = 0; count < MAX_PARALLEL; ++count) {
233 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
235 assertNotNull(opstart);
236 assertNotNull(opend);
237 assertEquals(PolicyResult.SUCCESS, opend.getResult());
239 assertEquals(MAX_PARALLEL, numStart);
240 assertEquals(MAX_PARALLEL, oper.getCount());
241 assertEquals(MAX_PARALLEL, numEnd);
245 * Tests startPreprocessor() when the preprocessor returns a failure.
248 public void testStartPreprocessorFailure() {
249 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
251 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
255 * Tests startPreprocessor() when the preprocessor throws an exception.
258 public void testStartPreprocessorException() {
259 // arrange for the preprocessor to throw an exception
260 oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
262 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
266 * Tests startPreprocessor() when the pipeline is not running.
269 public void testStartPreprocessorNotRunning() {
270 // arrange for the preprocessor to return success, which will be ignored
271 // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
273 oper.start().cancel(false);
274 assertTrue(executor.runAll(MAX_REQUESTS));
279 assertEquals(0, numStart);
280 assertEquals(0, oper.getCount());
281 assertEquals(0, numEnd);
285 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
288 public void testStartPreprocessorBuilderException() {
289 oper = new MyOper() {
291 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
292 throw new IllegalStateException(EXPECTED_EXCEPTION);
296 assertThatIllegalStateException().isThrownBy(() -> oper.start());
298 // should be nothing in the queue
299 assertEquals(0, executor.getQueueLength());
303 public void testStartPreprocessorAsync() {
304 assertNull(oper.startPreprocessorAsync());
308 public void testStartGuardAsync() throws Exception {
309 CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
310 assertTrue(future.isDone());
311 assertEquals(PolicyResult.SUCCESS, future.get().getResult());
313 // verify the parameters that were passed
314 ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
315 ArgumentCaptor.forClass(ControlLoopOperationParams.class);
316 verify(guardOperator).buildOperation(paramsCaptor.capture());
318 params = paramsCaptor.getValue();
319 assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
320 assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
321 assertNull(params.getRetry());
322 assertNull(params.getTimeoutSec());
324 Map<String, Object> payload = params.getPayload();
325 assertNotNull(payload);
327 @SuppressWarnings("unchecked")
328 Map<String, Object> resource = (Map<String, Object>) payload.get("resource");
329 assertNotNull(resource);
331 @SuppressWarnings("unchecked")
332 Map<String, Object> guard = (Map<String, Object>) resource.get("guard");
333 assertEquals(oper.makeGuardPayload(), guard);
337 public void testMakeGuardPayload() {
338 Map<String, Object> payload = oper.makeGuardPayload();
339 assertSame(REQ_ID, payload.get("requestId"));
341 // request id changes, so remove it
342 payload.remove("requestId");
344 assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity}", payload.toString());
346 // repeat, but with closed loop name
347 event.setClosedLoopControlName("my-loop");
348 payload = oper.makeGuardPayload();
349 payload.remove("requestId");
350 assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity, clname=my-loop}", payload.toString());
354 public void testStartOperationAsync() {
356 assertTrue(executor.runAll(MAX_REQUESTS));
358 assertEquals(1, oper.getCount());
362 public void testIsSuccess() {
363 assertFalse(oper.isSuccess(null));
365 OperationOutcome outcome = new OperationOutcome();
367 outcome.setResult(PolicyResult.SUCCESS);
368 assertTrue(oper.isSuccess(outcome));
370 for (PolicyResult failure : FAILURE_RESULTS) {
371 outcome.setResult(failure);
372 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
377 public void testIsActorFailed() {
378 assertFalse(oper.isActorFailed(null));
380 OperationOutcome outcome = params.makeOutcome();
383 outcome.setResult(PolicyResult.SUCCESS);
384 assertFalse(oper.isActorFailed(outcome));
386 outcome.setResult(PolicyResult.FAILURE_RETRIES);
387 assertFalse(oper.isActorFailed(outcome));
390 outcome.setResult(PolicyResult.FAILURE);
393 outcome.setActor(MY_SINK);
394 assertFalse(oper.isActorFailed(outcome));
395 outcome.setActor(null);
396 assertFalse(oper.isActorFailed(outcome));
397 outcome.setActor(ACTOR);
399 // incorrect operation
400 outcome.setOperation(MY_SINK);
401 assertFalse(oper.isActorFailed(outcome));
402 outcome.setOperation(null);
403 assertFalse(oper.isActorFailed(outcome));
404 outcome.setOperation(OPERATION);
407 assertTrue(oper.isActorFailed(outcome));
411 public void testDoOperation() {
413 * Use an operation that doesn't override doOperation().
415 OperationPartial oper2 = new OperationPartial(params, config) {};
418 assertTrue(executor.runAll(MAX_REQUESTS));
420 assertNotNull(opend);
421 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
425 public void testTimeout() throws Exception {
427 // use a real executor
428 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
430 // trigger timeout very quickly
431 oper = new MyOper() {
433 protected long getTimeoutMs(Integer timeoutSec) {
438 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
440 OperationOutcome outcome2 = params.makeOutcome();
441 outcome2.setResult(PolicyResult.SUCCESS);
444 * Create an incomplete future that will timeout after the operation's
445 * timeout. If it fires before the other timer, then it will return a
448 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
449 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
450 params.getExecutor());
456 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
460 * Tests retry functions, when the count is set to zero and retries are exhausted.
463 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
464 params = params.toBuilder().retry(0).build();
466 // new params, thus need a new operation
469 oper.setMaxFailures(10);
471 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
475 * Tests retry functions, when the count is null and retries are exhausted.
478 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
479 params = params.toBuilder().retry(null).build();
481 // new params, thus need a new operation
484 oper.setMaxFailures(10);
486 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
490 * Tests retry functions, when retries are exhausted.
493 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
494 final int maxRetries = 3;
495 params = params.toBuilder().retry(maxRetries).build();
497 // new params, thus need a new operation
500 oper.setMaxFailures(10);
502 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
503 PolicyResult.FAILURE_RETRIES);
507 * Tests retry functions, when a success follows some retries.
510 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
511 params = params.toBuilder().retry(10).build();
513 // new params, thus need a new operation
516 final int maxFailures = 3;
517 oper.setMaxFailures(maxFailures);
519 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
520 PolicyResult.SUCCESS);
524 * Tests retry functions, when the outcome is {@code null}.
527 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
529 // arrange to return null from doOperation()
530 oper = new MyOper() {
532 protected OperationOutcome doOperation(int attempt, OperationOutcome outcome) {
535 super.doOperation(attempt, outcome);
540 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
544 public void testSleep() throws Exception {
545 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
546 assertTrue(future.isDone());
547 assertNull(future.get());
550 future = oper.sleep(0, TimeUnit.SECONDS);
551 assertTrue(future.isDone());
552 assertNull(future.get());
555 * Start a second sleep we can use to check the first while it's running.
557 tstart = Instant.now();
558 future = oper.sleep(100, TimeUnit.MILLISECONDS);
560 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
562 // wait for second to complete and verify that the first has not completed
564 assertFalse(future.isDone());
566 // wait for second to complete
569 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
570 assertTrue(diff >= 99);
574 public void testIsSameOperation() {
575 assertFalse(oper.isSameOperation(null));
577 OperationOutcome outcome = params.makeOutcome();
579 // wrong actor - should be false
580 outcome.setActor(null);
581 assertFalse(oper.isSameOperation(outcome));
582 outcome.setActor(MY_SINK);
583 assertFalse(oper.isSameOperation(outcome));
584 outcome.setActor(ACTOR);
586 // wrong operation - should be null
587 outcome.setOperation(null);
588 assertFalse(oper.isSameOperation(outcome));
589 outcome.setOperation(MY_SINK);
590 assertFalse(oper.isSameOperation(outcome));
591 outcome.setOperation(OPERATION);
593 assertTrue(oper.isSameOperation(outcome));
597 * Tests handleFailure() when the outcome is a success.
600 public void testHandlePreprocessorFailureSuccess() {
601 oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
602 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
606 * Tests handleFailure() when the outcome is <i>not</i> a success.
609 public void testHandlePreprocessorFailureFailed() throws Exception {
610 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
611 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
615 * Tests handleFailure() when the outcome is {@code null}.
618 public void testHandlePreprocessorFailureNull() throws Exception {
619 // arrange to return a null outcome from the preprocessor
620 oper.setPreProc(CompletableFuture.completedFuture(null));
621 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
625 public void testFromException() {
626 // arrange to generate an exception when operation runs
627 oper.setGenException(true);
629 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
633 * Tests fromException() when there is no exception.
636 public void testFromExceptionNoExcept() {
637 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
641 * Tests both flavors of anyOf(), because one invokes the other.
644 public void testAnyOf() throws Exception {
645 // first task completes, others do not
646 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
648 final OperationOutcome outcome = params.makeOutcome();
650 tasks.add(() -> CompletableFuture.completedFuture(outcome));
651 tasks.add(() -> new CompletableFuture<>());
652 tasks.add(() -> null);
653 tasks.add(() -> new CompletableFuture<>());
655 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
656 assertTrue(executor.runAll(MAX_REQUESTS));
657 assertTrue(result.isDone());
658 assertSame(outcome, result.get());
660 // repeat using array form
661 @SuppressWarnings("unchecked")
662 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
663 result = oper.anyOf(tasks.toArray(taskArray));
664 assertTrue(executor.runAll(MAX_REQUESTS));
665 assertTrue(result.isDone());
666 assertSame(outcome, result.get());
668 // second task completes, others do not
670 tasks.add(() -> new CompletableFuture<>());
671 tasks.add(() -> CompletableFuture.completedFuture(outcome));
672 tasks.add(() -> new CompletableFuture<>());
674 result = oper.anyOf(tasks);
675 assertTrue(executor.runAll(MAX_REQUESTS));
676 assertTrue(result.isDone());
677 assertSame(outcome, result.get());
679 // third task completes, others do not
681 tasks.add(() -> new CompletableFuture<>());
682 tasks.add(() -> new CompletableFuture<>());
683 tasks.add(() -> CompletableFuture.completedFuture(outcome));
685 result = oper.anyOf(tasks);
686 assertTrue(executor.runAll(MAX_REQUESTS));
687 assertTrue(result.isDone());
688 assertSame(outcome, result.get());
692 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
695 @SuppressWarnings("unchecked")
696 public void testAnyOfEdge() throws Exception {
697 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
699 // zero items: check both using a list and using an array
700 assertNull(oper.anyOf(tasks));
701 assertNull(oper.anyOf());
703 // one item: : check both using a list and using an array
704 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
705 tasks.add(() -> future1);
707 assertSame(future1, oper.anyOf(tasks));
708 assertSame(future1, oper.anyOf(() -> future1));
712 public void testAllOfArray() throws Exception {
713 final OperationOutcome outcome = params.makeOutcome();
715 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
716 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
717 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
719 @SuppressWarnings("unchecked")
720 CompletableFuture<OperationOutcome> result =
721 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
723 assertTrue(executor.runAll(MAX_REQUESTS));
724 assertFalse(result.isDone());
725 future1.complete(outcome);
727 // complete 3 before 2
728 assertTrue(executor.runAll(MAX_REQUESTS));
729 assertFalse(result.isDone());
730 future3.complete(outcome);
732 assertTrue(executor.runAll(MAX_REQUESTS));
733 assertFalse(result.isDone());
734 future2.complete(outcome);
736 // all of them are now done
737 assertTrue(executor.runAll(MAX_REQUESTS));
738 assertTrue(result.isDone());
739 assertSame(outcome, result.get());
743 public void testAllOfList() throws Exception {
744 final OperationOutcome outcome = params.makeOutcome();
746 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
747 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
748 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
750 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
751 tasks.add(() -> future1);
752 tasks.add(() -> future2);
753 tasks.add(() -> null);
754 tasks.add(() -> future3);
756 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
758 assertTrue(executor.runAll(MAX_REQUESTS));
759 assertFalse(result.isDone());
760 future1.complete(outcome);
762 // complete 3 before 2
763 assertTrue(executor.runAll(MAX_REQUESTS));
764 assertFalse(result.isDone());
765 future3.complete(outcome);
767 assertTrue(executor.runAll(MAX_REQUESTS));
768 assertFalse(result.isDone());
769 future2.complete(outcome);
771 // all of them are now done
772 assertTrue(executor.runAll(MAX_REQUESTS));
773 assertTrue(result.isDone());
774 assertSame(outcome, result.get());
778 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
781 @SuppressWarnings("unchecked")
782 public void testAllOfEdge() throws Exception {
783 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
785 // zero items: check both using a list and using an array
786 assertNull(oper.allOf(tasks));
787 assertNull(oper.allOf());
789 // one item: : check both using a list and using an array
790 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
791 tasks.add(() -> future1);
793 assertSame(future1, oper.allOf(tasks));
794 assertSame(future1, oper.allOf(() -> future1));
798 public void testAttachFutures() throws Exception {
799 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
801 // third task throws an exception during construction
802 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
803 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
804 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
805 tasks.add(() -> future1);
806 tasks.add(() -> future2);
808 throw new IllegalStateException(EXPECTED_EXCEPTION);
810 tasks.add(() -> future3);
812 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
814 // should have canceled the first two, but not the last
815 assertTrue(future1.isCancelled());
816 assertTrue(future2.isCancelled());
817 assertFalse(future3.isCancelled());
821 public void testCombineOutcomes() throws Exception {
823 verifyOutcomes(0, PolicyResult.SUCCESS);
824 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
826 // maximum is in different positions
827 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
828 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
829 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
831 // null outcome - takes precedence over a success
832 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
833 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
834 tasks.add(() -> CompletableFuture.completedFuture(null));
835 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
836 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
838 assertTrue(executor.runAll(MAX_REQUESTS));
839 assertTrue(result.isDone());
840 assertNull(result.get());
842 // one throws an exception during execution
843 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
846 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
847 tasks.add(() -> CompletableFuture.failedFuture(except));
848 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
849 result = oper.allOf(tasks);
851 assertTrue(executor.runAll(MAX_REQUESTS));
852 assertTrue(result.isCompletedExceptionally());
853 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
857 * Tests both flavors of sequence(), because one invokes the other.
860 public void testSequence() throws Exception {
861 final OperationOutcome outcome = params.makeOutcome();
863 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
864 tasks.add(() -> CompletableFuture.completedFuture(outcome));
865 tasks.add(() -> null);
866 tasks.add(() -> CompletableFuture.completedFuture(outcome));
867 tasks.add(() -> CompletableFuture.completedFuture(outcome));
869 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
870 assertTrue(executor.runAll(MAX_REQUESTS));
871 assertTrue(result.isDone());
872 assertSame(outcome, result.get());
874 // repeat using array form
875 @SuppressWarnings("unchecked")
876 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
877 result = oper.sequence(tasks.toArray(taskArray));
878 assertTrue(executor.runAll(MAX_REQUESTS));
879 assertTrue(result.isDone());
880 assertSame(outcome, result.get());
882 // second task fails, third should not run
883 OperationOutcome failure = params.makeOutcome();
884 failure.setResult(PolicyResult.FAILURE);
886 tasks.add(() -> CompletableFuture.completedFuture(outcome));
887 tasks.add(() -> CompletableFuture.completedFuture(failure));
888 tasks.add(() -> CompletableFuture.completedFuture(outcome));
890 result = oper.sequence(tasks);
891 assertTrue(executor.runAll(MAX_REQUESTS));
892 assertTrue(result.isDone());
893 assertSame(failure, result.get());
897 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
900 @SuppressWarnings("unchecked")
901 public void testSequenceEdge() throws Exception {
902 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
904 // zero items: check both using a list and using an array
905 assertNull(oper.sequence(tasks));
906 assertNull(oper.sequence());
908 // one item: : check both using a list and using an array
909 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
910 tasks.add(() -> future1);
912 assertSame(future1, oper.sequence(tasks));
913 assertSame(future1, oper.sequence(() -> future1));
916 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
917 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
919 OperationOutcome expectedOutcome = null;
921 for (int count = 0; count < results.length; ++count) {
922 OperationOutcome outcome = params.makeOutcome();
923 outcome.setResult(results[count]);
924 tasks.add(() -> CompletableFuture.completedFuture(outcome));
926 if (count == expected) {
927 expectedOutcome = outcome;
931 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
933 assertTrue(executor.runAll(MAX_REQUESTS));
934 assertTrue(result.isDone());
935 assertSame(expectedOutcome, result.get());
939 public void testDetmPriority() throws CoderException {
940 assertEquals(1, oper.detmPriority(null));
942 OperationOutcome outcome = params.makeOutcome();
944 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
945 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
946 PolicyResult.FAILURE_EXCEPTION, 6);
948 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
949 outcome.setResult(ent.getKey());
950 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
954 * Test null result. We can't actually set it to null, because the set() method
955 * won't allow it. Instead, we decode it from a structure.
957 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
958 assertEquals(1, oper.detmPriority(outcome));
962 * Tests callbackStarted() when the pipeline has already been stopped.
965 public void testCallbackStartedNotRunning() {
966 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
969 * arrange to stop the controller when the start-callback is invoked, but capture
972 params = params.toBuilder().startCallback(oper -> {
974 future.get().cancel(false);
977 // new params, thus need a new operation
980 future.set(oper.start());
981 assertTrue(executor.runAll(MAX_REQUESTS));
983 // should have only run once
984 assertEquals(1, numStart);
988 * Tests callbackCompleted() when the pipeline has already been stopped.
991 public void testCallbackCompletedNotRunning() {
992 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
994 // arrange to stop the controller when the start-callback is invoked
995 params = params.toBuilder().startCallback(oper -> {
996 future.get().cancel(false);
999 // new params, thus need a new operation
1000 oper = new MyOper();
1002 future.set(oper.start());
1003 assertTrue(executor.runAll(MAX_REQUESTS));
1005 // should not have been set
1007 assertEquals(0, numEnd);
1011 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1012 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1014 OperationOutcome outcome;
1016 outcome = new OperationOutcome();
1017 oper.setOutcome(outcome, timex);
1018 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1019 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1021 outcome = new OperationOutcome();
1022 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1023 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1024 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1028 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1029 OperationOutcome outcome;
1031 outcome = new OperationOutcome();
1032 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1033 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1034 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1036 for (PolicyResult result : FAILURE_RESULTS) {
1037 outcome = new OperationOutcome();
1038 oper.setOutcome(outcome, result);
1039 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1040 assertEquals(result.toString(), result, outcome.getResult());
1045 public void testIsTimeout() {
1046 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1048 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1049 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1050 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1051 assertFalse(oper.isTimeout(new CompletionException(null)));
1052 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1054 assertTrue(oper.isTimeout(timex));
1055 assertTrue(oper.isTimeout(new CompletionException(timex)));
1059 public void testLogMessage() {
1060 final String infraStr = SINK_INFRA.toString();
1062 // log structured data
1063 appender.clearExtractions();
1064 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1065 List<String> output = appender.getExtracted();
1066 assertEquals(1, output.size());
1068 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1069 .contains("{\n \"text\": \"my-text\"\n}");
1071 // repeat with a response
1072 appender.clearExtractions();
1073 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1074 output = appender.getExtracted();
1075 assertEquals(1, output.size());
1077 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1078 .contains("{\n \"text\": \"my-text\"\n}");
1080 // log a plain string
1081 appender.clearExtractions();
1082 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1083 output = appender.getExtracted();
1084 assertEquals(1, output.size());
1085 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1087 // log a null request
1088 appender.clearExtractions();
1089 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1090 output = appender.getExtracted();
1091 assertEquals(1, output.size());
1093 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1095 // generate exception from coder
1096 setOperCoderException();
1098 appender.clearExtractions();
1099 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1100 output = appender.getExtracted();
1101 assertEquals(2, output.size());
1102 assertThat(output.get(0)).contains("cannot pretty-print request");
1103 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1105 // repeat with a response
1106 appender.clearExtractions();
1107 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1108 output = appender.getExtracted();
1109 assertEquals(2, output.size());
1110 assertThat(output.get(0)).contains("cannot pretty-print response");
1111 assertThat(output.get(1)).contains(MY_SOURCE);
1115 public void testGetRetry() {
1116 assertEquals(0, oper.getRetry(null));
1117 assertEquals(10, oper.getRetry(10));
1121 public void testGetRetryWait() {
1122 // need an operator that doesn't override the retry time
1123 OperationPartial oper2 = new OperationPartial(params, config) {};
1124 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1128 public void testGetTimeOutMs() {
1129 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1131 params = params.toBuilder().timeoutSec(null).build();
1133 // new params, thus need a new operation
1134 oper = new MyOper();
1136 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1139 private void starter(OperationOutcome oper) {
1141 tstart = oper.getStart();
1146 private void completer(OperationOutcome oper) {
1153 * Gets a function that does nothing.
1155 * @param <T> type of input parameter expected by the function
1156 * @return a function that does nothing
1158 private <T> Consumer<T> noop() {
1163 private OperationOutcome makeSuccess() {
1164 OperationOutcome outcome = params.makeOutcome();
1165 outcome.setResult(PolicyResult.SUCCESS);
1170 private OperationOutcome makeFailure() {
1171 OperationOutcome outcome = params.makeOutcome();
1172 outcome.setResult(PolicyResult.FAILURE);
1180 * @param testName test name
1181 * @param expectedCallbacks number of callbacks expected
1182 * @param expectedOperations number of operation invocations expected
1183 * @param expectedResult expected outcome
1185 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1186 PolicyResult expectedResult) {
1188 String expectedSubRequestId =
1189 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1191 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1197 * @param testName test name
1198 * @param expectedCallbacks number of callbacks expected
1199 * @param expectedOperations number of operation invocations expected
1200 * @param expectedResult expected outcome
1201 * @param expectedSubRequestId expected sub request ID
1202 * @param manipulator function to modify the future returned by
1203 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1204 * in the executor are run
1206 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1207 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1215 CompletableFuture<OperationOutcome> future = oper.start();
1217 manipulator.accept(future);
1219 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1221 assertEquals(testName, expectedCallbacks, numStart);
1222 assertEquals(testName, expectedCallbacks, numEnd);
1224 if (expectedCallbacks > 0) {
1225 assertNotNull(testName, opstart);
1226 assertNotNull(testName, opend);
1227 assertEquals(testName, expectedResult, opend.getResult());
1229 assertSame(testName, tstart, opstart.getStart());
1230 assertSame(testName, tstart, opend.getStart());
1233 assertTrue(future.isDone());
1234 assertEquals(testName, opend, future.get());
1236 // "start" is never final
1237 for (OperationOutcome outcome : starts) {
1238 assertFalse(testName, outcome.isFinalOutcome());
1241 // only the last "complete" is final
1242 assertTrue(testName, ends.removeLast().isFinalOutcome());
1244 for (OperationOutcome outcome : ends) {
1245 assertFalse(outcome.isFinalOutcome());
1248 } catch (InterruptedException | ExecutionException e) {
1249 throw new IllegalStateException(e);
1252 if (expectedOperations > 0) {
1253 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1257 assertEquals(testName, expectedOperations, oper.getCount());
1261 * Creates a new {@link #oper} whose coder will throw an exception.
1263 private void setOperCoderException() {
1264 oper = new MyOper() {
1266 protected Coder makeCoder() {
1267 return new StandardCoder() {
1269 public String encode(Object object, boolean pretty) throws CoderException {
1270 throw new CoderException(EXPECTED_EXCEPTION);
1279 public static class MyData {
1280 private String text = TEXT;
1284 private class MyOper extends OperationPartial {
1286 private int count = 0;
1289 private boolean genException;
1291 private int maxFailures = 0;
1293 private CompletableFuture<OperationOutcome> preProc;
1297 super(OperationPartialTest.this.params, config);
1301 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1304 throw new IllegalStateException(EXPECTED_EXCEPTION);
1307 operation.setSubRequestId(String.valueOf(attempt));
1309 if (count > maxFailures) {
1310 operation.setResult(PolicyResult.SUCCESS);
1312 operation.setResult(PolicyResult.FAILURE);
1319 protected long getRetryWaitMs() {
1321 * Sleep timers run in the background, but we want to control things via the
1322 * "executor", thus we avoid sleep timers altogether by simply returning 0.
1328 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1329 return (preProc != null ? preProc : super.startPreprocessorAsync());