2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertSame;
30 import static org.junit.Assert.assertTrue;
31 import static org.mockito.ArgumentMatchers.any;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
35 import ch.qos.logback.classic.Logger;
36 import java.time.Instant;
37 import java.util.Arrays;
38 import java.util.LinkedList;
39 import java.util.List;
41 import java.util.Map.Entry;
42 import java.util.UUID;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.CompletionException;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ForkJoinPool;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicReference;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import java.util.stream.Collectors;
56 import org.junit.AfterClass;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.mockito.ArgumentCaptor;
61 import org.mockito.Mock;
62 import org.mockito.MockitoAnnotations;
63 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
64 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
65 import org.onap.policy.common.utils.coder.Coder;
66 import org.onap.policy.common.utils.coder.CoderException;
67 import org.onap.policy.common.utils.coder.StandardCoder;
68 import org.onap.policy.common.utils.test.log.logback.ExtractAppender;
69 import org.onap.policy.common.utils.time.PseudoExecutor;
70 import org.onap.policy.controlloop.ControlLoopOperation;
71 import org.onap.policy.controlloop.VirtualControlLoopEvent;
72 import org.onap.policy.controlloop.actorserviceprovider.ActorService;
73 import org.onap.policy.controlloop.actorserviceprovider.Operation;
74 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
75 import org.onap.policy.controlloop.actorserviceprovider.Operator;
76 import org.onap.policy.controlloop.actorserviceprovider.controlloop.ControlLoopEventContext;
77 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
78 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
79 import org.onap.policy.controlloop.actorserviceprovider.spi.Actor;
80 import org.onap.policy.controlloop.policy.PolicyResult;
81 import org.slf4j.LoggerFactory;
83 public class OperationPartialTest {
84 private static final CommInfrastructure SINK_INFRA = CommInfrastructure.NOOP;
85 private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.UEB;
86 private static final int MAX_REQUESTS = 100;
87 private static final int MAX_PARALLEL = 10;
88 private static final String EXPECTED_EXCEPTION = "expected exception";
89 private static final String ACTOR = "my-actor";
90 private static final String OPERATION = "my-operation";
91 private static final String MY_SINK = "my-sink";
92 private static final String MY_SOURCE = "my-source";
93 private static final String MY_TARGET_ENTITY = "my-entity";
94 private static final String TEXT = "my-text";
95 private static final int TIMEOUT = 1000;
96 private static final UUID REQ_ID = UUID.randomUUID();
98 private static final List<PolicyResult> FAILURE_RESULTS = Arrays.asList(PolicyResult.values()).stream()
99 .filter(result -> result != PolicyResult.SUCCESS).collect(Collectors.toList());
102 * Used to attach an appender to the class' logger.
104 private static final Logger logger = (Logger) LoggerFactory.getLogger(OperationPartial.class);
105 private static final ExtractAppender appender = new ExtractAppender();
108 private ActorService service;
110 private Actor guardActor;
112 private Operator guardOperator;
114 private Operation guardOperation;
116 private VirtualControlLoopEvent event;
117 private ControlLoopEventContext context;
118 private PseudoExecutor executor;
119 private ControlLoopOperationParams params;
123 private int numStart;
126 private Instant tstart;
128 private OperationOutcome opstart;
129 private OperationOutcome opend;
131 private OperatorConfig config;
134 * Attaches the appender to the logger.
137 public static void setUpBeforeClass() throws Exception {
139 * Attach appender to the logger.
141 appender.setContext(logger.getLoggerContext());
144 logger.addAppender(appender);
148 * Stops the appender.
151 public static void tearDownAfterClass() {
156 * Initializes the fields, including {@link #oper}.
159 public void setUp() {
160 MockitoAnnotations.initMocks(this);
162 event = new VirtualControlLoopEvent();
163 event.setRequestId(REQ_ID);
165 context = new ControlLoopEventContext(event);
166 executor = new PseudoExecutor();
168 params = ControlLoopOperationParams.builder().completeCallback(this::completer).context(context)
169 .executor(executor).actorService(service).actor(ACTOR).operation(OPERATION).timeoutSec(TIMEOUT)
170 .startCallback(this::starter).targetEntity(MY_TARGET_ENTITY).build();
172 when(service.getActor(OperationPartial.GUARD_ACTOR_NAME)).thenReturn(guardActor);
173 when(guardActor.getOperator(OperationPartial.GUARD_OPERATION_NAME)).thenReturn(guardOperator);
174 when(guardOperator.buildOperation(any())).thenReturn(guardOperation);
175 when(guardOperation.start()).thenReturn(CompletableFuture.completedFuture(makeSuccess()));
177 config = new OperatorConfig(executor);
188 public void testOperatorPartial_testGetActorName_testGetName() {
189 assertEquals(ACTOR, oper.getActorName());
190 assertEquals(OPERATION, oper.getName());
191 assertEquals(ACTOR + "." + OPERATION, oper.getFullName());
195 public void testGetBlockingThread() throws Exception {
196 CompletableFuture<Void> future = new CompletableFuture<>();
198 // use the real executor
199 OperatorPartial oper2 = new OperatorPartial(ACTOR, OPERATION) {
201 public Operation buildOperation(ControlLoopOperationParams params) {
206 oper2.getBlockingExecutor().execute(() -> future.complete(null));
208 assertNull(future.get(5, TimeUnit.SECONDS));
212 public void testStart() {
213 verifyRun("testStart", 1, 1, PolicyResult.SUCCESS);
217 * Tests start() with multiple running requests.
220 public void testStartMultiple() {
221 for (int count = 0; count < MAX_PARALLEL; ++count) {
225 assertTrue(executor.runAll(MAX_REQUESTS * MAX_PARALLEL));
227 assertNotNull(opstart);
228 assertNotNull(opend);
229 assertEquals(PolicyResult.SUCCESS, opend.getResult());
231 assertEquals(MAX_PARALLEL, numStart);
232 assertEquals(MAX_PARALLEL, oper.getCount());
233 assertEquals(MAX_PARALLEL, numEnd);
237 * Tests startPreprocessor() when the preprocessor returns a failure.
240 public void testStartPreprocessorFailure() {
241 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
243 verifyRun("testStartPreprocessorFailure", 1, 0, PolicyResult.FAILURE_GUARD);
247 * Tests startPreprocessor() when the preprocessor throws an exception.
250 public void testStartPreprocessorException() {
251 // arrange for the preprocessor to throw an exception
252 oper.setPreProc(CompletableFuture.failedFuture(new IllegalStateException(EXPECTED_EXCEPTION)));
254 verifyRun("testStartPreprocessorException", 1, 0, PolicyResult.FAILURE_GUARD);
258 * Tests startPreprocessor() when the pipeline is not running.
261 public void testStartPreprocessorNotRunning() {
262 // arrange for the preprocessor to return success, which will be ignored
263 // oper.setGuard(CompletableFuture.completedFuture(makeSuccess()));
265 oper.start().cancel(false);
266 assertTrue(executor.runAll(MAX_REQUESTS));
271 assertEquals(0, numStart);
272 assertEquals(0, oper.getCount());
273 assertEquals(0, numEnd);
277 * Tests startPreprocessor() when the preprocessor <b>builder</b> throws an exception.
280 public void testStartPreprocessorBuilderException() {
281 oper = new MyOper() {
283 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
284 throw new IllegalStateException(EXPECTED_EXCEPTION);
288 assertThatIllegalStateException().isThrownBy(() -> oper.start());
290 // should be nothing in the queue
291 assertEquals(0, executor.getQueueLength());
295 public void testStartPreprocessorAsync() {
296 assertNull(oper.startPreprocessorAsync());
300 public void testStartGuardAsync() throws Exception {
301 CompletableFuture<OperationOutcome> future = oper.startGuardAsync();
302 assertTrue(future.isDone());
303 assertEquals(PolicyResult.SUCCESS, future.get().getResult());
305 // verify the parameters that were passed
306 ArgumentCaptor<ControlLoopOperationParams> paramsCaptor =
307 ArgumentCaptor.forClass(ControlLoopOperationParams.class);
308 verify(guardOperator).buildOperation(paramsCaptor.capture());
310 params = paramsCaptor.getValue();
311 assertEquals(OperationPartial.GUARD_ACTOR_NAME, params.getActor());
312 assertEquals(OperationPartial.GUARD_OPERATION_NAME, params.getOperation());
313 assertNull(params.getRetry());
314 assertNull(params.getTimeoutSec());
316 Map<String, Object> payload = params.getPayload();
317 assertNotNull(payload);
319 @SuppressWarnings("unchecked")
320 Map<String, Object> resource = (Map<String, Object>) payload.get("resource");
321 assertNotNull(resource);
323 @SuppressWarnings("unchecked")
324 Map<String, Object> guard = (Map<String, Object>) resource.get("guard");
325 assertEquals(oper.makeGuardPayload(), guard);
329 public void testMakeGuardPayload() {
330 Map<String, Object> payload = oper.makeGuardPayload();
331 assertSame(REQ_ID, payload.get("requestId"));
333 // request id changes, so remove it
334 payload.remove("requestId");
336 assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity}", payload.toString());
338 // repeat, but with closed loop name
339 event.setClosedLoopControlName("my-loop");
340 payload = oper.makeGuardPayload();
341 payload.remove("requestId");
342 assertEquals("{actor=my-actor, recipe=my-operation, target=my-entity, clname=my-loop}", payload.toString());
346 public void testStartOperationAsync() {
348 assertTrue(executor.runAll(MAX_REQUESTS));
350 assertEquals(1, oper.getCount());
354 public void testIsSuccess() {
355 OperationOutcome outcome = new OperationOutcome();
357 outcome.setResult(PolicyResult.SUCCESS);
358 assertTrue(oper.isSuccess(outcome));
360 for (PolicyResult failure : FAILURE_RESULTS) {
361 outcome.setResult(failure);
362 assertFalse("testIsSuccess-" + failure, oper.isSuccess(outcome));
367 public void testIsActorFailed() {
368 assertFalse(oper.isActorFailed(null));
370 OperationOutcome outcome = params.makeOutcome();
373 outcome.setResult(PolicyResult.SUCCESS);
374 assertFalse(oper.isActorFailed(outcome));
376 outcome.setResult(PolicyResult.FAILURE_RETRIES);
377 assertFalse(oper.isActorFailed(outcome));
380 outcome.setResult(PolicyResult.FAILURE);
383 outcome.setActor(MY_SINK);
384 assertFalse(oper.isActorFailed(outcome));
385 outcome.setActor(null);
386 assertFalse(oper.isActorFailed(outcome));
387 outcome.setActor(ACTOR);
389 // incorrect operation
390 outcome.setOperation(MY_SINK);
391 assertFalse(oper.isActorFailed(outcome));
392 outcome.setOperation(null);
393 assertFalse(oper.isActorFailed(outcome));
394 outcome.setOperation(OPERATION);
397 assertTrue(oper.isActorFailed(outcome));
401 public void testDoOperation() {
403 * Use an operation that doesn't override doOperation().
405 OperationPartial oper2 = new OperationPartial(params, config) {};
408 assertTrue(executor.runAll(MAX_REQUESTS));
410 assertNotNull(opend);
411 assertEquals(PolicyResult.FAILURE_EXCEPTION, opend.getResult());
415 public void testTimeout() throws Exception {
417 // use a real executor
418 params = params.toBuilder().executor(ForkJoinPool.commonPool()).build();
420 // trigger timeout very quickly
421 oper = new MyOper() {
423 protected long getTimeoutMs(Integer timeoutSec) {
428 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
430 OperationOutcome outcome2 = params.makeOutcome();
431 outcome2.setResult(PolicyResult.SUCCESS);
434 * Create an incomplete future that will timeout after the operation's
435 * timeout. If it fires before the other timer, then it will return a
438 CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
439 future = future.orTimeout(1, TimeUnit.SECONDS).handleAsync((unused1, unused2) -> outcome,
440 params.getExecutor());
446 assertEquals(PolicyResult.FAILURE_TIMEOUT, oper.start().get().getResult());
450 * Tests retry functions, when the count is set to zero and retries are exhausted.
453 public void testSetRetryFlag_testRetryOnFailure_ZeroRetries_testStartOperationAttempt() {
454 params = params.toBuilder().retry(0).build();
456 // new params, thus need a new operation
459 oper.setMaxFailures(10);
461 verifyRun("testSetRetryFlag_testRetryOnFailure_ZeroRetries", 1, 1, PolicyResult.FAILURE);
465 * Tests retry functions, when the count is null and retries are exhausted.
468 public void testSetRetryFlag_testRetryOnFailure_NullRetries() {
469 params = params.toBuilder().retry(null).build();
471 // new params, thus need a new operation
474 oper.setMaxFailures(10);
476 verifyRun("testSetRetryFlag_testRetryOnFailure_NullRetries", 1, 1, PolicyResult.FAILURE);
480 * Tests retry functions, when retries are exhausted.
483 public void testSetRetryFlag_testRetryOnFailure_RetriesExhausted() {
484 final int maxRetries = 3;
485 params = params.toBuilder().retry(maxRetries).build();
487 // new params, thus need a new operation
490 oper.setMaxFailures(10);
492 verifyRun("testSetRetryFlag_testRetryOnFailure_RetriesExhausted", maxRetries + 1, maxRetries + 1,
493 PolicyResult.FAILURE_RETRIES);
497 * Tests retry functions, when a success follows some retries.
500 public void testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries() {
501 params = params.toBuilder().retry(10).build();
503 // new params, thus need a new operation
506 final int maxFailures = 3;
507 oper.setMaxFailures(maxFailures);
509 verifyRun("testSetRetryFlag_testRetryOnFailure_SuccessAfterRetries", maxFailures + 1, maxFailures + 1,
510 PolicyResult.SUCCESS);
514 * Tests retry functions, when the outcome is {@code null}.
517 public void testSetRetryFlag_testRetryOnFailure_NullOutcome() {
519 // arrange to return null from doOperation()
520 oper = new MyOper() {
522 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
525 super.doOperation(attempt, operation);
530 verifyRun("testSetRetryFlag_testRetryOnFailure_NullOutcome", 1, 1, PolicyResult.FAILURE, null, noop());
534 public void testSleep() throws Exception {
535 CompletableFuture<Void> future = oper.sleep(-1, TimeUnit.SECONDS);
536 assertTrue(future.isDone());
537 assertNull(future.get());
540 future = oper.sleep(0, TimeUnit.SECONDS);
541 assertTrue(future.isDone());
542 assertNull(future.get());
545 * Start a second sleep we can use to check the first while it's running.
547 tstart = Instant.now();
548 future = oper.sleep(100, TimeUnit.MILLISECONDS);
550 CompletableFuture<Void> future2 = oper.sleep(10, TimeUnit.MILLISECONDS);
552 // wait for second to complete and verify that the first has not completed
554 assertFalse(future.isDone());
556 // wait for second to complete
559 long diff = Instant.now().toEpochMilli() - tstart.toEpochMilli();
560 assertTrue(diff >= 99);
564 public void testIsSameOperation() {
565 assertFalse(oper.isSameOperation(null));
567 OperationOutcome outcome = params.makeOutcome();
569 // wrong actor - should be false
570 outcome.setActor(null);
571 assertFalse(oper.isSameOperation(outcome));
572 outcome.setActor(MY_SINK);
573 assertFalse(oper.isSameOperation(outcome));
574 outcome.setActor(ACTOR);
576 // wrong operation - should be null
577 outcome.setOperation(null);
578 assertFalse(oper.isSameOperation(outcome));
579 outcome.setOperation(MY_SINK);
580 assertFalse(oper.isSameOperation(outcome));
581 outcome.setOperation(OPERATION);
583 assertTrue(oper.isSameOperation(outcome));
587 * Tests handleFailure() when the outcome is a success.
590 public void testHandlePreprocessorFailureTrue() {
591 oper.setPreProc(CompletableFuture.completedFuture(makeSuccess()));
592 verifyRun("testHandlePreprocessorFailureTrue", 1, 1, PolicyResult.SUCCESS);
596 * Tests handleFailure() when the outcome is <i>not</i> a success.
599 public void testHandlePreprocessorFailureFalse() throws Exception {
600 oper.setPreProc(CompletableFuture.completedFuture(makeFailure()));
601 verifyRun("testHandlePreprocessorFailureFalse", 1, 0, PolicyResult.FAILURE_GUARD);
605 * Tests handleFailure() when the outcome is {@code null}.
608 public void testHandlePreprocessorFailureNull() throws Exception {
609 // arrange to return a null outcome from the preprocessor
610 oper.setPreProc(CompletableFuture.completedFuture(null));
611 verifyRun("testHandlePreprocessorFailureNull", 1, 0, PolicyResult.FAILURE_GUARD);
615 public void testFromException() {
616 // arrange to generate an exception when operation runs
617 oper.setGenException(true);
619 verifyRun("testFromException", 1, 1, PolicyResult.FAILURE_EXCEPTION);
623 * Tests fromException() when there is no exception.
626 public void testFromExceptionNoExcept() {
627 verifyRun("testFromExceptionNoExcept", 1, 1, PolicyResult.SUCCESS);
631 * Tests both flavors of anyOf(), because one invokes the other.
634 public void testAnyOf() throws Exception {
635 // first task completes, others do not
636 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
638 final OperationOutcome outcome = params.makeOutcome();
640 tasks.add(() -> CompletableFuture.completedFuture(outcome));
641 tasks.add(() -> new CompletableFuture<>());
642 tasks.add(() -> null);
643 tasks.add(() -> new CompletableFuture<>());
645 CompletableFuture<OperationOutcome> result = oper.anyOf(tasks);
646 assertTrue(executor.runAll(MAX_REQUESTS));
647 assertTrue(result.isDone());
648 assertSame(outcome, result.get());
650 // repeat using array form
651 @SuppressWarnings("unchecked")
652 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
653 result = oper.anyOf(tasks.toArray(taskArray));
654 assertTrue(executor.runAll(MAX_REQUESTS));
655 assertTrue(result.isDone());
656 assertSame(outcome, result.get());
658 // second task completes, others do not
660 tasks.add(() -> new CompletableFuture<>());
661 tasks.add(() -> CompletableFuture.completedFuture(outcome));
662 tasks.add(() -> new CompletableFuture<>());
664 result = oper.anyOf(tasks);
665 assertTrue(executor.runAll(MAX_REQUESTS));
666 assertTrue(result.isDone());
667 assertSame(outcome, result.get());
669 // third task completes, others do not
671 tasks.add(() -> new CompletableFuture<>());
672 tasks.add(() -> new CompletableFuture<>());
673 tasks.add(() -> CompletableFuture.completedFuture(outcome));
675 result = oper.anyOf(tasks);
676 assertTrue(executor.runAll(MAX_REQUESTS));
677 assertTrue(result.isDone());
678 assertSame(outcome, result.get());
682 * Tests both flavors of anyOf(), for edge cases: zero items, and one item.
685 @SuppressWarnings("unchecked")
686 public void testAnyOfEdge() throws Exception {
687 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
689 // zero items: check both using a list and using an array
690 assertNull(oper.anyOf(tasks));
691 assertNull(oper.anyOf());
693 // one item: : check both using a list and using an array
694 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
695 tasks.add(() -> future1);
697 assertSame(future1, oper.anyOf(tasks));
698 assertSame(future1, oper.anyOf(() -> future1));
702 public void testAllOfArray() throws Exception {
703 final OperationOutcome outcome = params.makeOutcome();
705 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
706 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
707 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
709 @SuppressWarnings("unchecked")
710 CompletableFuture<OperationOutcome> result =
711 oper.allOf(() -> future1, () -> future2, () -> null, () -> future3);
713 assertTrue(executor.runAll(MAX_REQUESTS));
714 assertFalse(result.isDone());
715 future1.complete(outcome);
717 // complete 3 before 2
718 assertTrue(executor.runAll(MAX_REQUESTS));
719 assertFalse(result.isDone());
720 future3.complete(outcome);
722 assertTrue(executor.runAll(MAX_REQUESTS));
723 assertFalse(result.isDone());
724 future2.complete(outcome);
726 // all of them are now done
727 assertTrue(executor.runAll(MAX_REQUESTS));
728 assertTrue(result.isDone());
729 assertSame(outcome, result.get());
733 public void testAllOfList() throws Exception {
734 final OperationOutcome outcome = params.makeOutcome();
736 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
737 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
738 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
740 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
741 tasks.add(() -> future1);
742 tasks.add(() -> future2);
743 tasks.add(() -> null);
744 tasks.add(() -> future3);
746 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
748 assertTrue(executor.runAll(MAX_REQUESTS));
749 assertFalse(result.isDone());
750 future1.complete(outcome);
752 // complete 3 before 2
753 assertTrue(executor.runAll(MAX_REQUESTS));
754 assertFalse(result.isDone());
755 future3.complete(outcome);
757 assertTrue(executor.runAll(MAX_REQUESTS));
758 assertFalse(result.isDone());
759 future2.complete(outcome);
761 // all of them are now done
762 assertTrue(executor.runAll(MAX_REQUESTS));
763 assertTrue(result.isDone());
764 assertSame(outcome, result.get());
768 * Tests both flavors of allOf(), for edge cases: zero items, and one item.
771 @SuppressWarnings("unchecked")
772 public void testAllOfEdge() throws Exception {
773 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
775 // zero items: check both using a list and using an array
776 assertNull(oper.allOf(tasks));
777 assertNull(oper.allOf());
779 // one item: : check both using a list and using an array
780 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
781 tasks.add(() -> future1);
783 assertSame(future1, oper.allOf(tasks));
784 assertSame(future1, oper.allOf(() -> future1));
788 public void testAttachFutures() throws Exception {
789 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
791 // third task throws an exception during construction
792 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
793 CompletableFuture<OperationOutcome> future2 = new CompletableFuture<>();
794 CompletableFuture<OperationOutcome> future3 = new CompletableFuture<>();
795 tasks.add(() -> future1);
796 tasks.add(() -> future2);
798 throw new IllegalStateException(EXPECTED_EXCEPTION);
800 tasks.add(() -> future3);
802 assertThatIllegalStateException().isThrownBy(() -> oper.anyOf(tasks)).withMessage(EXPECTED_EXCEPTION);
804 // should have canceled the first two, but not the last
805 assertTrue(future1.isCancelled());
806 assertTrue(future2.isCancelled());
807 assertFalse(future3.isCancelled());
811 public void testCombineOutcomes() throws Exception {
813 verifyOutcomes(0, PolicyResult.SUCCESS);
814 verifyOutcomes(0, PolicyResult.FAILURE_EXCEPTION);
816 // maximum is in different positions
817 verifyOutcomes(0, PolicyResult.FAILURE, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD);
818 verifyOutcomes(1, PolicyResult.SUCCESS, PolicyResult.FAILURE, PolicyResult.FAILURE_GUARD);
819 verifyOutcomes(2, PolicyResult.SUCCESS, PolicyResult.FAILURE_GUARD, PolicyResult.FAILURE);
821 // null outcome - takes precedence over a success
822 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
823 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
824 tasks.add(() -> CompletableFuture.completedFuture(null));
825 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
826 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
828 assertTrue(executor.runAll(MAX_REQUESTS));
829 assertTrue(result.isDone());
830 assertNull(result.get());
832 // one throws an exception during execution
833 IllegalStateException except = new IllegalStateException(EXPECTED_EXCEPTION);
836 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
837 tasks.add(() -> CompletableFuture.failedFuture(except));
838 tasks.add(() -> CompletableFuture.completedFuture(params.makeOutcome()));
839 result = oper.allOf(tasks);
841 assertTrue(executor.runAll(MAX_REQUESTS));
842 assertTrue(result.isCompletedExceptionally());
843 result.whenComplete((unused, thrown) -> assertSame(except, thrown));
847 * Tests both flavors of sequence(), because one invokes the other.
850 public void testSequence() throws Exception {
851 final OperationOutcome outcome = params.makeOutcome();
853 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
854 tasks.add(() -> CompletableFuture.completedFuture(outcome));
855 tasks.add(() -> null);
856 tasks.add(() -> CompletableFuture.completedFuture(outcome));
857 tasks.add(() -> CompletableFuture.completedFuture(outcome));
859 CompletableFuture<OperationOutcome> result = oper.sequence(tasks);
860 assertTrue(executor.runAll(MAX_REQUESTS));
861 assertTrue(result.isDone());
862 assertSame(outcome, result.get());
864 // repeat using array form
865 @SuppressWarnings("unchecked")
866 Supplier<CompletableFuture<OperationOutcome>>[] taskArray = new Supplier[tasks.size()];
867 result = oper.sequence(tasks.toArray(taskArray));
868 assertTrue(executor.runAll(MAX_REQUESTS));
869 assertTrue(result.isDone());
870 assertSame(outcome, result.get());
872 // second task fails, third should not run
873 OperationOutcome failure = params.makeOutcome();
874 failure.setResult(PolicyResult.FAILURE);
876 tasks.add(() -> CompletableFuture.completedFuture(outcome));
877 tasks.add(() -> CompletableFuture.completedFuture(failure));
878 tasks.add(() -> CompletableFuture.completedFuture(outcome));
880 result = oper.sequence(tasks);
881 assertTrue(executor.runAll(MAX_REQUESTS));
882 assertTrue(result.isDone());
883 assertSame(failure, result.get());
887 * Tests both flavors of sequence(), for edge cases: zero items, and one item.
890 @SuppressWarnings("unchecked")
891 public void testSequenceEdge() throws Exception {
892 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
894 // zero items: check both using a list and using an array
895 assertNull(oper.sequence(tasks));
896 assertNull(oper.sequence());
898 // one item: : check both using a list and using an array
899 CompletableFuture<OperationOutcome> future1 = new CompletableFuture<>();
900 tasks.add(() -> future1);
902 assertSame(future1, oper.sequence(tasks));
903 assertSame(future1, oper.sequence(() -> future1));
906 private void verifyOutcomes(int expected, PolicyResult... results) throws Exception {
907 List<Supplier<CompletableFuture<OperationOutcome>>> tasks = new LinkedList<>();
909 OperationOutcome expectedOutcome = null;
911 for (int count = 0; count < results.length; ++count) {
912 OperationOutcome outcome = params.makeOutcome();
913 outcome.setResult(results[count]);
914 tasks.add(() -> CompletableFuture.completedFuture(outcome));
916 if (count == expected) {
917 expectedOutcome = outcome;
921 CompletableFuture<OperationOutcome> result = oper.allOf(tasks);
923 assertTrue(executor.runAll(MAX_REQUESTS));
924 assertTrue(result.isDone());
925 assertSame(expectedOutcome, result.get());
929 public void testDetmPriority() throws CoderException {
930 assertEquals(1, oper.detmPriority(null));
932 OperationOutcome outcome = params.makeOutcome();
934 Map<PolicyResult, Integer> map = Map.of(PolicyResult.SUCCESS, 0, PolicyResult.FAILURE_GUARD, 2,
935 PolicyResult.FAILURE_RETRIES, 3, PolicyResult.FAILURE, 4, PolicyResult.FAILURE_TIMEOUT, 5,
936 PolicyResult.FAILURE_EXCEPTION, 6);
938 for (Entry<PolicyResult, Integer> ent : map.entrySet()) {
939 outcome.setResult(ent.getKey());
940 assertEquals(ent.getKey().toString(), ent.getValue().intValue(), oper.detmPriority(outcome));
944 * Test null result. We can't actually set it to null, because the set() method
945 * won't allow it. Instead, we decode it from a structure.
947 outcome = new StandardCoder().decode("{\"result\":null}", OperationOutcome.class);
948 assertEquals(1, oper.detmPriority(outcome));
952 * Tests callbackStarted() when the pipeline has already been stopped.
955 public void testCallbackStartedNotRunning() {
956 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
959 * arrange to stop the controller when the start-callback is invoked, but capture
962 params = params.toBuilder().startCallback(oper -> {
964 future.get().cancel(false);
967 // new params, thus need a new operation
970 future.set(oper.start());
971 assertTrue(executor.runAll(MAX_REQUESTS));
973 // should have only run once
974 assertEquals(1, numStart);
978 * Tests callbackCompleted() when the pipeline has already been stopped.
981 public void testCallbackCompletedNotRunning() {
982 AtomicReference<Future<OperationOutcome>> future = new AtomicReference<>();
984 // arrange to stop the controller when the start-callback is invoked
985 params = params.toBuilder().startCallback(oper -> {
986 future.get().cancel(false);
989 // new params, thus need a new operation
992 future.set(oper.start());
993 assertTrue(executor.runAll(MAX_REQUESTS));
995 // should not have been set
997 assertEquals(0, numEnd);
1001 public void testSetOutcomeControlLoopOperationOutcomeThrowable() {
1002 final CompletionException timex = new CompletionException(new TimeoutException(EXPECTED_EXCEPTION));
1004 OperationOutcome outcome;
1006 outcome = new OperationOutcome();
1007 oper.setOutcome(outcome, timex);
1008 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1009 assertEquals(PolicyResult.FAILURE_TIMEOUT, outcome.getResult());
1011 outcome = new OperationOutcome();
1012 oper.setOutcome(outcome, new IllegalStateException(EXPECTED_EXCEPTION));
1013 assertEquals(ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1014 assertEquals(PolicyResult.FAILURE_EXCEPTION, outcome.getResult());
1018 public void testSetOutcomeControlLoopOperationOutcomePolicyResult() {
1019 OperationOutcome outcome;
1021 outcome = new OperationOutcome();
1022 oper.setOutcome(outcome, PolicyResult.SUCCESS);
1023 assertEquals(ControlLoopOperation.SUCCESS_MSG, outcome.getMessage());
1024 assertEquals(PolicyResult.SUCCESS, outcome.getResult());
1026 for (PolicyResult result : FAILURE_RESULTS) {
1027 outcome = new OperationOutcome();
1028 oper.setOutcome(outcome, result);
1029 assertEquals(result.toString(), ControlLoopOperation.FAILED_MSG, outcome.getMessage());
1030 assertEquals(result.toString(), result, outcome.getResult());
1035 public void testIsTimeout() {
1036 final TimeoutException timex = new TimeoutException(EXPECTED_EXCEPTION);
1038 assertFalse(oper.isTimeout(new IllegalStateException(EXPECTED_EXCEPTION)));
1039 assertFalse(oper.isTimeout(new IllegalStateException(timex)));
1040 assertFalse(oper.isTimeout(new CompletionException(new IllegalStateException(timex))));
1041 assertFalse(oper.isTimeout(new CompletionException(null)));
1042 assertFalse(oper.isTimeout(new CompletionException(new CompletionException(timex))));
1044 assertTrue(oper.isTimeout(timex));
1045 assertTrue(oper.isTimeout(new CompletionException(timex)));
1049 public void testLogMessage() {
1050 final String infraStr = SINK_INFRA.toString();
1052 // log structured data
1053 appender.clearExtractions();
1054 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1055 List<String> output = appender.getExtracted();
1056 assertEquals(1, output.size());
1058 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("OUT")
1059 .contains("{\n \"text\": \"my-text\"\n}");
1061 // repeat with a response
1062 appender.clearExtractions();
1063 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1064 output = appender.getExtracted();
1065 assertEquals(1, output.size());
1067 assertThat(output.get(0)).contains(SOURCE_INFRA.toString()).contains(MY_SOURCE).contains("IN")
1068 .contains("{\n \"text\": \"my-text\"\n}");
1070 // log a plain string
1071 appender.clearExtractions();
1072 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, TEXT);
1073 output = appender.getExtracted();
1074 assertEquals(1, output.size());
1075 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains(TEXT);
1077 // log a null request
1078 appender.clearExtractions();
1079 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, null);
1080 output = appender.getExtracted();
1081 assertEquals(1, output.size());
1083 assertThat(output.get(0)).contains(infraStr).contains(MY_SINK).contains("null");
1085 // generate exception from coder
1086 setOperCoderException();
1088 appender.clearExtractions();
1089 oper.logMessage(EventType.OUT, SINK_INFRA, MY_SINK, new MyData());
1090 output = appender.getExtracted();
1091 assertEquals(2, output.size());
1092 assertThat(output.get(0)).contains("cannot pretty-print request");
1093 assertThat(output.get(1)).contains(infraStr).contains(MY_SINK);
1095 // repeat with a response
1096 appender.clearExtractions();
1097 oper.logMessage(EventType.IN, SOURCE_INFRA, MY_SOURCE, new MyData());
1098 output = appender.getExtracted();
1099 assertEquals(2, output.size());
1100 assertThat(output.get(0)).contains("cannot pretty-print response");
1101 assertThat(output.get(1)).contains(MY_SOURCE);
1105 public void testGetRetry() {
1106 assertEquals(0, oper.getRetry(null));
1107 assertEquals(10, oper.getRetry(10));
1111 public void testGetRetryWait() {
1112 // need an operator that doesn't override the retry time
1113 OperationPartial oper2 = new OperationPartial(params, config) {};
1114 assertEquals(OperationPartial.DEFAULT_RETRY_WAIT_MS, oper2.getRetryWaitMs());
1118 public void testGetTimeOutMs() {
1119 assertEquals(TIMEOUT * 1000, oper.getTimeoutMs(params.getTimeoutSec()));
1121 params = params.toBuilder().timeoutSec(null).build();
1123 // new params, thus need a new operation
1124 oper = new MyOper();
1126 assertEquals(0, oper.getTimeoutMs(params.getTimeoutSec()));
1129 private void starter(OperationOutcome oper) {
1131 tstart = oper.getStart();
1135 private void completer(OperationOutcome oper) {
1141 * Gets a function that does nothing.
1143 * @param <T> type of input parameter expected by the function
1144 * @return a function that does nothing
1146 private <T> Consumer<T> noop() {
1151 private OperationOutcome makeSuccess() {
1152 OperationOutcome outcome = params.makeOutcome();
1153 outcome.setResult(PolicyResult.SUCCESS);
1158 private OperationOutcome makeFailure() {
1159 OperationOutcome outcome = params.makeOutcome();
1160 outcome.setResult(PolicyResult.FAILURE);
1168 * @param testName test name
1169 * @param expectedCallbacks number of callbacks expected
1170 * @param expectedOperations number of operation invocations expected
1171 * @param expectedResult expected outcome
1173 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations,
1174 PolicyResult expectedResult) {
1176 String expectedSubRequestId =
1177 (expectedResult == PolicyResult.FAILURE_EXCEPTION ? null : String.valueOf(expectedOperations));
1179 verifyRun(testName, expectedCallbacks, expectedOperations, expectedResult, expectedSubRequestId, noop());
1185 * @param testName test name
1186 * @param expectedCallbacks number of callbacks expected
1187 * @param expectedOperations number of operation invocations expected
1188 * @param expectedResult expected outcome
1189 * @param expectedSubRequestId expected sub request ID
1190 * @param manipulator function to modify the future returned by
1191 * {@link OperationPartial#start(ControlLoopOperationParams)} before the tasks
1192 * in the executor are run
1194 private void verifyRun(String testName, int expectedCallbacks, int expectedOperations, PolicyResult expectedResult,
1195 String expectedSubRequestId, Consumer<CompletableFuture<OperationOutcome>> manipulator) {
1197 CompletableFuture<OperationOutcome> future = oper.start();
1199 manipulator.accept(future);
1201 assertTrue(testName, executor.runAll(MAX_REQUESTS));
1203 assertEquals(testName, expectedCallbacks, numStart);
1204 assertEquals(testName, expectedCallbacks, numEnd);
1206 if (expectedCallbacks > 0) {
1207 assertNotNull(testName, opstart);
1208 assertNotNull(testName, opend);
1209 assertEquals(testName, expectedResult, opend.getResult());
1211 assertSame(testName, tstart, opstart.getStart());
1212 assertSame(testName, tstart, opend.getStart());
1215 assertTrue(future.isDone());
1216 assertSame(testName, opend, future.get());
1218 } catch (InterruptedException | ExecutionException e) {
1219 throw new IllegalStateException(e);
1222 if (expectedOperations > 0) {
1223 assertEquals(testName, expectedSubRequestId, opend.getSubRequestId());
1227 assertEquals(testName, expectedOperations, oper.getCount());
1231 * Creates a new {@link #oper} whose coder will throw an exception.
1233 private void setOperCoderException() {
1234 oper = new MyOper() {
1236 protected Coder makeCoder() {
1237 return new StandardCoder() {
1239 public String encode(Object object, boolean pretty) throws CoderException {
1240 throw new CoderException(EXPECTED_EXCEPTION);
1249 public static class MyData {
1250 private String text = TEXT;
1254 private class MyOper extends OperationPartial {
1256 private int count = 0;
1259 private boolean genException;
1261 private int maxFailures = 0;
1263 private CompletableFuture<OperationOutcome> preProc;
1267 super(OperationPartialTest.this.params, config);
1271 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
1274 throw new IllegalStateException(EXPECTED_EXCEPTION);
1277 operation.setSubRequestId(String.valueOf(attempt));
1279 if (count > maxFailures) {
1280 operation.setResult(PolicyResult.SUCCESS);
1282 operation.setResult(PolicyResult.FAILURE);
1289 protected long getRetryWaitMs() {
1291 * Sleep timers run in the background, but we want to control things via the
1292 * "executor", thus we avoid sleep timers altogether by simply returning 0.
1298 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
1299 return (preProc != null ? preProc : super.startPreprocessorAsync());