2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedHashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Queue;
31 import java.util.concurrent.CancellationException;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.CompletionException;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.function.BiConsumer;
38 import java.util.function.Function;
39 import java.util.function.Supplier;
40 import java.util.function.UnaryOperator;
42 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
43 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
44 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
45 import org.onap.policy.common.utils.coder.Coder;
46 import org.onap.policy.common.utils.coder.CoderException;
47 import org.onap.policy.common.utils.coder.StandardCoder;
48 import org.onap.policy.controlloop.ControlLoopOperation;
49 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
50 import org.onap.policy.controlloop.actorserviceprovider.Operation;
51 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
52 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
53 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
54 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
55 import org.onap.policy.controlloop.policy.PolicyResult;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
60 * Partial implementation of an operator. In general, it's preferable that subclasses
61 * would override {@link #startOperationAsync(int, OperationOutcome)
62 * startOperationAsync()}. However, if that proves to be too difficult, then they can
63 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
64 * if the operation requires any preprocessor steps, the subclass may choose to override
65 * {@link #startPreprocessorAsync()}.
67 * The futures returned by the methods within this class can be canceled, and will
68 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
69 * returned by overridden methods will do the same. Of course, if a class overrides
70 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
71 * be done to cancel that particular operation.
73 * In general tasks in a pipeline are executed by the same thread. However, the following
74 * should always be executed via the executor specified in "params":
76 * <li>start callback</li>
77 * <li>completion callback</li>
78 * <li>controller completion (i.e., delayedComplete())</li>
81 public abstract class OperationPartial implements Operation {
82 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
83 private static final Coder coder = new StandardCoder();
85 public static final String GUARD_ACTOR_NAME = "GUARD";
86 public static final String GUARD_OPERATION_NAME = "Decision";
87 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
89 private final OperatorConfig config;
92 * Operation parameters.
94 protected final ControlLoopOperationParams params;
97 private final String fullName;
101 * Constructs the object.
103 * @param params operation parameters
104 * @param config configuration for this operation
106 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
107 this.params = params;
108 this.config = config;
109 this.fullName = params.getActor() + "." + params.getOperation();
112 public Executor getBlockingExecutor() {
113 return config.getBlockingExecutor();
116 public String getActorName() {
117 return params.getActor();
120 public String getName() {
121 return params.getOperation();
125 public CompletableFuture<OperationOutcome> start() {
126 // allocate a controller for the entire operation
127 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
129 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
130 if (preproc == null) {
131 // no preprocessor required - just start the operation
132 return startOperationAttempt(controller, 1);
136 * Do preprocessor first and then, if successful, start the operation. Note:
137 * operations create their own outcome, ignoring the outcome from any previous
140 * Wrap the preprocessor to ensure "stop" is propagated to it.
143 controller.wrap(preproc)
144 .exceptionally(fromException("preprocessor of operation"))
145 .thenCompose(handlePreprocessorFailure(controller))
146 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
147 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
154 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
155 * invokes the call-backs, marks the controller complete, and returns an incomplete
156 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
159 * Assumes that no callbacks have been invoked yet.
161 * @param controller pipeline controller
162 * @return a function that checks the outcome status and continues, if successful, or
163 * indicates a failure otherwise
165 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
166 PipelineControllerFuture<OperationOutcome> controller) {
170 if (isSuccess(outcome)) {
171 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
172 return CompletableFuture.completedFuture(outcome);
175 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
177 final Executor executor = params.getExecutor();
178 final CallbackManager callbacks = new CallbackManager();
180 // propagate "stop" to the callbacks
181 controller.add(callbacks);
183 final OperationOutcome outcome2 = params.makeOutcome();
185 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
187 outcome2.setFinalOutcome(true);
188 outcome2.setResult(PolicyResult.FAILURE_GUARD);
189 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
192 CompletableFuture.completedFuture(outcome2)
193 .whenCompleteAsync(callbackStarted(callbacks), executor)
194 .whenCompleteAsync(callbackCompleted(callbacks), executor)
195 .whenCompleteAsync(controller.delayedComplete(), executor);
198 return new CompletableFuture<>();
203 * Invokes the operation's preprocessor step(s) as a "future". This method simply
204 * returns {@code null}.
206 * This method assumes the following:
208 * <li>the operator is alive</li>
209 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
212 * @return a function that will start the preprocessor and returns its outcome, or
213 * {@code null} if this operation needs no preprocessor
215 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
220 * Invokes the operation's guard step(s) as a "future".
222 * This method assumes the following:
224 * <li>the operator is alive</li>
225 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
228 * @return a function that will start the guard checks and returns its outcome, or
229 * {@code null} if this operation has no guard
231 protected CompletableFuture<OperationOutcome> startGuardAsync() {
232 // get the guard payload
233 Map<String, Object> guardPayload = makeGuardPayload();
235 // wrap it in a "resource"
236 Map<String, Object> resource = new LinkedHashMap<>();
237 resource.put("guard", guardPayload);
239 Map<String, Object> payload = new LinkedHashMap<>();
240 payload.put("resource", resource);
243 * Note: can't use constants from actor.guard, because that would create a
244 * circular dependency.
246 return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
247 .payload(payload).build().start();
251 * Creates a payload to execute a guard operation.
253 * @return a new guard payload
255 protected Map<String, Object> makeGuardPayload() {
256 Map<String, Object> guard = new LinkedHashMap<>();
257 guard.put("actor", params.getActor());
258 guard.put("recipe", params.getOperation());
259 guard.put("target", params.getTargetEntity());
260 guard.put("requestId", params.getRequestId());
262 String clname = params.getContext().getEvent().getClosedLoopControlName();
263 if (clname != null) {
264 guard.put("clname", clname);
271 * Starts the operation attempt, with no preprocessor. When all retries complete, it
272 * will complete the controller.
274 * @param controller controller for all operation attempts
275 * @param attempt attempt number, typically starting with 1
276 * @return a future that will return the final result of all attempts
278 private CompletableFuture<OperationOutcome> startOperationAttempt(
279 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
281 // propagate "stop" to the operation attempt
282 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
283 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
289 * Starts the operation attempt, without doing any retries.
291 * @param params operation parameters
292 * @param attempt attempt number, typically starting with 1
293 * @return a future that will return the result of a single operation attempt
295 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
297 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
299 final Executor executor = params.getExecutor();
300 final OperationOutcome outcome = params.makeOutcome();
301 final CallbackManager callbacks = new CallbackManager();
303 // this operation attempt gets its own controller
304 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
306 // propagate "stop" to the callbacks
307 controller.add(callbacks);
310 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
311 .whenCompleteAsync(callbackStarted(callbacks), executor)
312 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
315 // handle timeouts, if specified
316 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
317 if (timeoutMillis > 0) {
318 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
319 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
323 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
324 * before callbackCompleted() is invoked.
326 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
327 * the pipeline as the last step anyway.
331 future.exceptionally(fromException("operation"))
332 .thenApply(setRetryFlag(attempt))
333 .whenCompleteAsync(callbackStarted(callbacks), executor)
334 .whenCompleteAsync(callbackCompleted(callbacks), executor)
335 .whenCompleteAsync(controller.delayedComplete(), executor);
342 * Determines if the outcome was successful.
344 * @param outcome outcome to examine
345 * @return {@code true} if the outcome was successful
347 protected boolean isSuccess(OperationOutcome outcome) {
348 return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
352 * Determines if the outcome was a failure for this operator.
354 * @param outcome outcome to examine, or {@code null}
355 * @return {@code true} if the outcome is not {@code null} and was a failure
356 * <i>and</i> was associated with this operator, {@code false} otherwise
358 protected boolean isActorFailed(OperationOutcome outcome) {
359 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
363 * Determines if the given outcome is for this operation.
365 * @param outcome outcome to examine
366 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
368 protected boolean isSameOperation(OperationOutcome outcome) {
369 return OperationOutcome.isFor(outcome, getActorName(), getName());
373 * Invokes the operation as a "future". This method simply invokes
374 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
375 * returning the result via a "future".
377 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
378 * the executor in the "params", as that may bring the background thread pool to a
379 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
382 * This method assumes the following:
384 * <li>the operator is alive</li>
385 * <li>verifyRunning() has been invoked</li>
386 * <li>callbackStarted() has been invoked</li>
387 * <li>the invoker will perform appropriate timeout checks</li>
388 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
391 * @param attempt attempt number, typically starting with 1
392 * @return a function that will start the operation and return its result when
395 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
397 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
401 * Low-level method that performs the operation. This can make the same assumptions
402 * that are made by {@link #doOperationAsFuture()}. This particular method simply
403 * throws an {@link UnsupportedOperationException}.
405 * @param attempt attempt number, typically starting with 1
406 * @param operation the operation being performed
407 * @return the outcome of the operation
409 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
411 throw new UnsupportedOperationException("start operation " + getFullName());
415 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
416 * FAILURE, assuming the policy specifies retries and the retry count has been
419 * @param attempt latest attempt number, starting with 1
420 * @return a function to get the next future to execute
422 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
424 return origOutcome -> {
425 // ensure we have a non-null outcome
426 OperationOutcome outcome;
427 if (origOutcome != null) {
428 outcome = origOutcome;
430 logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
431 outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
434 // ensure correct actor/operation
435 outcome.setActor(getActorName());
436 outcome.setOperation(getName());
438 // determine if we should retry, based on the result
439 if (outcome.getResult() != PolicyResult.FAILURE) {
440 // do not retry success or other failure types (e.g., exception)
441 outcome.setFinalOutcome(true);
445 int retry = getRetry(params.getRetry());
447 // no retries were specified
448 outcome.setFinalOutcome(true);
450 } else if (attempt <= retry) {
451 // have more retries - not the final outcome
452 outcome.setFinalOutcome(false);
456 * retries were specified and we've already tried them all - change to
459 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
460 outcome.setResult(PolicyResult.FAILURE_RETRIES);
461 outcome.setFinalOutcome(true);
469 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
470 * was previously invoked, and thus that the "operation" is not {@code null}.
472 * @param controller controller for all of the retries
473 * @param attempt latest attempt number, starting with 1
474 * @return a function to get the next future to execute
476 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
477 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
479 return operation -> {
480 if (!isActorFailed(operation)) {
481 // wrong type or wrong operation - just leave it as is
482 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
483 controller.complete(operation);
484 return new CompletableFuture<>();
487 if (getRetry(params.getRetry()) <= 0) {
488 // no retries - already marked as FAILURE, so just return it
489 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
490 controller.complete(operation);
491 return new CompletableFuture<>();
495 * Retry the operation.
497 long waitMs = getRetryWaitMs();
498 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
500 return sleep(waitMs, TimeUnit.MILLISECONDS)
501 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
506 * Convenience method that starts a sleep(), running via a future.
508 * @param sleepTime time to sleep
509 * @param unit time unit
510 * @return a future that will complete when the sleep completes
512 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
513 if (sleepTime <= 0) {
514 return CompletableFuture.completedFuture(null);
517 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
521 * Converts an exception into an operation outcome, returning a copy of the outcome to
522 * prevent background jobs from changing it.
524 * @param type type of item throwing the exception
525 * @return a function that will convert an exception into an operation outcome
527 private Function<Throwable, OperationOutcome> fromException(String type) {
530 OperationOutcome outcome = params.makeOutcome();
532 if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
533 // do not include exception in the message, as it just clutters the log
534 logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
535 params.getRequestId());
537 logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
538 params.getRequestId(), thrown);
541 return setOutcome(outcome, thrown);
546 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
547 * any outstanding futures when one completes.
549 * @param futureMakers function to make a future. If the function returns
550 * {@code null}, then no future is created for that function. On the other
551 * hand, if the function throws an exception, then the previously created
552 * functions are canceled and the exception is re-thrown
553 * @return a future to cancel or await an outcome, or {@code null} if no futures were
554 * created. If this future is canceled, then all of the futures will be
557 public CompletableFuture<OperationOutcome> anyOf(
558 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
560 return anyOf(Arrays.asList(futureMakers));
564 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
565 * any outstanding futures when one completes.
567 * @param futureMakers function to make a future. If the function returns
568 * {@code null}, then no future is created for that function. On the other
569 * hand, if the function throws an exception, then the previously created
570 * functions are canceled and the exception is re-thrown
571 * @return a future to cancel or await an outcome, or {@code null} if no futures were
572 * created. If this future is canceled, then all of the futures will be
573 * canceled. Similarly, when this future completes, any incomplete futures
576 public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
578 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
580 CompletableFuture<OperationOutcome>[] futures =
581 attachFutures(controller, futureMakers, UnaryOperator.identity());
583 if (futures.length == 0) {
584 // no futures were started
588 if (futures.length == 1) {
592 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
593 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
599 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
601 * @param futureMakers function to make a future. If the function returns
602 * {@code null}, then no future is created for that function. On the other
603 * hand, if the function throws an exception, then the previously created
604 * functions are canceled and the exception is re-thrown
605 * @return a future to cancel or await an outcome, or {@code null} if no futures were
606 * created. If this future is canceled, then all of the futures will be
609 public CompletableFuture<OperationOutcome> allOf(
610 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
612 return allOf(Arrays.asList(futureMakers));
616 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
618 * @param futureMakers function to make a future. If the function returns
619 * {@code null}, then no future is created for that function. On the other
620 * hand, if the function throws an exception, then the previously created
621 * functions are canceled and the exception is re-thrown
622 * @return a future to cancel or await an outcome, or {@code null} if no futures were
623 * created. If this future is canceled, then all of the futures will be
624 * canceled. Similarly, when this future completes, any incomplete futures
627 public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
628 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
630 Queue<OperationOutcome> outcomes = new LinkedList<>();
632 CompletableFuture<OperationOutcome>[] futures =
633 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
634 synchronized (outcomes) {
635 outcomes.add(outcome);
640 if (futures.length == 0) {
641 // no futures were started
645 if (futures.length == 1) {
650 CompletableFuture.allOf(futures)
651 .thenApply(unused -> combineOutcomes(outcomes))
652 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
659 * Invokes the functions to create the futures and attaches them to the controller.
661 * @param controller master controller for all of the futures
662 * @param futureMakers futures to be attached to the controller
663 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
664 * Returns the adorned future
665 * @return an array of futures, possibly zero-length. If the array is of size one,
666 * then that one item should be returned instead of the controller
668 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
669 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
670 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
672 if (futureMakers.isEmpty()) {
673 @SuppressWarnings("unchecked")
674 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
678 // the last, unadorned future that is created
679 CompletableFuture<OperationOutcome> lastFuture = null;
681 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
684 for (var maker : futureMakers) {
686 CompletableFuture<OperationOutcome> future = maker.get();
687 if (future == null) {
691 // propagate "stop" to the future
692 controller.add(future);
694 futures.add(adorn.apply(future));
698 } catch (RuntimeException e) {
699 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
700 controller.cancel(false);
705 @SuppressWarnings("unchecked")
706 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
708 if (result.length == 1) {
709 // special case - return the unadorned future
710 result[0] = lastFuture;
714 return futures.toArray(result);
718 * Combines the outcomes from a set of tasks.
720 * @param outcomes outcomes to be examined
721 * @return the combined outcome
723 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
725 // identify the outcome with the highest priority
726 OperationOutcome outcome = outcomes.remove();
727 int priority = detmPriority(outcome);
729 for (OperationOutcome outcome2 : outcomes) {
730 int priority2 = detmPriority(outcome2);
732 if (priority2 > priority) {
734 priority = priority2;
738 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
739 (outcome == null ? null : outcome.getResult()), params.getRequestId());
745 * Determines the priority of an outcome based on its result.
747 * @param outcome outcome to examine, or {@code null}
748 * @return the outcome's priority
750 protected int detmPriority(OperationOutcome outcome) {
751 if (outcome == null || outcome.getResult() == null) {
755 switch (outcome.getResult()) {
762 case FAILURE_RETRIES:
768 case FAILURE_TIMEOUT:
771 case FAILURE_EXCEPTION:
778 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
779 * not created until the previous task completes. The pipeline returns the outcome of
780 * the last task executed.
782 * @param futureMakers functions to make the futures
783 * @return a future to cancel the sequence or await the outcome
785 public CompletableFuture<OperationOutcome> sequence(
786 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
788 return sequence(Arrays.asList(futureMakers));
792 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
793 * not created until the previous task completes. The pipeline returns the outcome of
794 * the last task executed.
796 * @param futureMakers functions to make the futures
797 * @return a future to cancel the sequence or await the outcome, or {@code null} if
798 * there were no tasks to perform
800 public CompletableFuture<OperationOutcome> sequence(
801 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
803 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
805 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
806 if (nextTask == null) {
811 if (queue.isEmpty()) {
812 // only one task - just return it rather than wrapping it in a controller
817 * multiple tasks - need a controller to stop whichever task is currently
820 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
821 final Executor executor = params.getExecutor();
824 controller.wrap(nextTask)
825 .thenCompose(nextTaskOnSuccess(controller, queue))
826 .whenCompleteAsync(controller.delayedComplete(), executor);
833 * Executes the next task in the queue, if the previous outcome was successful.
835 * @param controller pipeline controller
836 * @param taskQueue queue of tasks to be performed
837 * @return a future to execute the remaining tasks, or the current outcome, if it's a
838 * failure, or if there are no more tasks
840 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
841 PipelineControllerFuture<OperationOutcome> controller,
842 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
845 if (!isSuccess(outcome)) {
846 // return the failure
847 return CompletableFuture.completedFuture(outcome);
850 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
851 if (nextTask == null) {
852 // no tasks - just return the success
853 return CompletableFuture.completedFuture(outcome);
859 .thenCompose(nextTaskOnSuccess(controller, taskQueue));
865 * Gets the next task from the queue, skipping those that are {@code null}.
867 * @param taskQueue task queue
868 * @return the next task, or {@code null} if the queue is now empty
870 private CompletableFuture<OperationOutcome> getNextTask(
871 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
873 Supplier<CompletableFuture<OperationOutcome>> maker;
875 while ((maker = taskQueue.poll()) != null) {
876 CompletableFuture<OperationOutcome> future = maker.get();
877 if (future != null) {
886 * Sets the start time of the operation and invokes the callback to indicate that the
887 * operation has started. Does nothing if the pipeline has been stopped.
889 * This assumes that the "outcome" is not {@code null}.
891 * @param callbacks used to determine if the start callback can be invoked
892 * @return a function that sets the start time and invokes the callback
894 private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
896 return (outcome, thrown) -> {
898 if (callbacks.canStart()) {
899 outcome.setStart(callbacks.getStartTime());
900 outcome.setEnd(null);
902 // pass a copy to the callback
903 OperationOutcome outcome2 = new OperationOutcome(outcome);
904 outcome2.setFinalOutcome(false);
905 params.callbackStarted(outcome2);
911 * Sets the end time of the operation and invokes the callback to indicate that the
912 * operation has completed. Does nothing if the pipeline has been stopped.
914 * This assumes that the "outcome" is not {@code null}.
916 * Note: the start time must be a reference rather than a plain value, because it's
917 * value must be gotten on-demand, when the returned function is executed at a later
920 * @param callbacks used to determine if the end callback can be invoked
921 * @return a function that sets the end time and invokes the callback
923 private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
925 return (outcome, thrown) -> {
926 if (callbacks.canEnd()) {
927 outcome.setStart(callbacks.getStartTime());
928 outcome.setEnd(callbacks.getEndTime());
930 // pass a copy to the callback
931 params.callbackCompleted(new OperationOutcome(outcome));
937 * Sets an operation's outcome and message, based on a throwable.
939 * @param operation operation to be updated
940 * @return the updated operation
942 public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
943 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
944 return setOutcome(operation, result);
948 * Sets an operation's outcome and default message based on the result.
950 * @param operation operation to be updated
951 * @param result result of the operation
952 * @return the updated operation
954 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
955 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
956 operation.setResult(result);
957 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
958 : ControlLoopOperation.FAILED_MSG);
964 * Determines if a throwable is due to a timeout.
966 * @param thrown throwable of interest
967 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
969 protected boolean isTimeout(Throwable thrown) {
970 if (thrown instanceof CompletionException) {
971 thrown = thrown.getCause();
974 return (thrown instanceof TimeoutException);
978 * Logs a response. If the response is not of type, String, then it attempts to
979 * pretty-print it into JSON before logging.
981 * @param direction IN or OUT
982 * @param infra communication infrastructure on which it was published
983 * @param source source name (e.g., the URL or Topic name)
984 * @param response response to be logged
985 * @return the JSON text that was logged
987 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
990 if (response == null) {
992 } else if (response instanceof String) {
993 json = response.toString();
995 json = makeCoder().encode(response, true);
998 } catch (CoderException e) {
999 String type = (direction == EventType.IN ? "response" : "request");
1000 logger.warn("cannot pretty-print {}", type, e);
1001 json = response.toString();
1004 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
1009 // these may be overridden by subclasses or junit tests
1012 * Gets the retry count.
1014 * @param retry retry, extracted from the parameters, or {@code null}
1015 * @return the number of retries, or {@code 0} if no retries were specified
1017 protected int getRetry(Integer retry) {
1018 return (retry == null ? 0 : retry);
1022 * Gets the retry wait, in milliseconds.
1024 * @return the retry wait, in milliseconds
1026 protected long getRetryWaitMs() {
1027 return DEFAULT_RETRY_WAIT_MS;
1031 * Gets the operation timeout.
1033 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
1035 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
1038 protected long getTimeoutMs(Integer timeoutSec) {
1039 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
1042 // these may be overridden by junit tests
1044 protected Coder makeCoder() {